diff --git a/cmd/kube-controller-manager/app/extensions.go b/cmd/kube-controller-manager/app/extensions.go index f349ecbe212..a3218496fa7 100644 --- a/cmd/kube-controller-manager/app/extensions.go +++ b/cmd/kube-controller-manager/app/extensions.go @@ -36,7 +36,6 @@ func startDaemonSetController(ctx ControllerContext) (bool, error) { ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Nodes(), ctx.ClientBuilder.ClientOrDie("daemon-set-controller"), - int(ctx.Options.LookupCacheSizeForDaemonSet), ).Run(int(ctx.Options.ConcurrentDaemonSetSyncs), ctx.Stop) return true, nil } diff --git a/cmd/kube-controller-manager/app/options/options.go b/cmd/kube-controller-manager/app/options/options.go index 92d851e9cc3..b0a3cb0cb85 100644 --- a/cmd/kube-controller-manager/app/options/options.go +++ b/cmd/kube-controller-manager/app/options/options.go @@ -134,7 +134,9 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet, allControllers []string, disabled fs.Int32Var(&s.ConcurrentSATokenSyncs, "concurrent-serviceaccount-token-syncs", s.ConcurrentSATokenSyncs, "The number of service account token objects that are allowed to sync concurrently. Larger number = more responsive token generation, but more CPU (and network) load") fs.Int32Var(&s.LookupCacheSizeForRC, "replication-controller-lookup-cache-size", s.LookupCacheSizeForRC, "This flag is deprecated and will be removed in future releases. ReplicationController no longer requires a lookup cache.") fs.Int32Var(&s.LookupCacheSizeForRS, "replicaset-lookup-cache-size", s.LookupCacheSizeForRS, "This flag is deprecated and will be removed in future releases. ReplicaSet no longer requires a lookup cache.") - fs.Int32Var(&s.LookupCacheSizeForDaemonSet, "daemonset-lookup-cache-size", s.LookupCacheSizeForDaemonSet, "The the size of lookup cache for daemonsets. Larger number = more responsive daemonsets, but more MEM load.") + // TODO: Remove the following flag 6 months after v1.6.0 is released. + fs.Int32Var(&s.LookupCacheSizeForDaemonSet, "daemonset-lookup-cache-size", s.LookupCacheSizeForDaemonSet, "This flag is deprecated and will be removed in future releases. DaemonSet no longer requires a lookup cache.") + fs.MarkDeprecated("daemonset-lookup-cache-size", "This flag is deprecated and will be removed in future releases. DaemonSet no longer requires a lookup cache.") fs.DurationVar(&s.ServiceSyncPeriod.Duration, "service-sync-period", s.ServiceSyncPeriod.Duration, "The period for syncing services with their external load balancers") fs.DurationVar(&s.NodeSyncPeriod.Duration, "node-sync-period", 0, ""+ "This flag is deprecated and will be removed in future releases. See node-monitor-period for Node health checking or "+ diff --git a/pkg/apis/componentconfig/types.go b/pkg/apis/componentconfig/types.go index 9c6d6afbf37..e3d229d7d4e 100644 --- a/pkg/apis/componentconfig/types.go +++ b/pkg/apis/componentconfig/types.go @@ -690,6 +690,7 @@ type KubeControllerManagerConfiguration struct { LookupCacheSizeForRS int32 // lookupCacheSizeForDaemonSet is the size of lookup cache for daemonsets. // Larger number = more responsive daemonset, but more MEM load. + // DEPRECATED: This is no longer used. LookupCacheSizeForDaemonSet int32 // serviceSyncPeriod is the period for syncing services with their external // load balancers. diff --git a/pkg/client/listers/extensions/internalversion/daemonset_expansion.go b/pkg/client/listers/extensions/internalversion/daemonset_expansion.go index c5073f3bca6..15c7cc7ea89 100644 --- a/pkg/client/listers/extensions/internalversion/daemonset_expansion.go +++ b/pkg/client/listers/extensions/internalversion/daemonset_expansion.go @@ -35,8 +35,9 @@ type DaemonSetListerExpansion interface { // DaemonSetNamespaeLister. type DaemonSetNamespaceListerExpansion interface{} -// GetPodDaemonSets returns a list of daemon sets managing a pod. -// Returns an error if and only if no matching daemon sets are found. +// GetPodDaemonSets returns a list of DaemonSets that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching DaemonSets are found. func (s *daemonSetLister) GetPodDaemonSets(pod *api.Pod) ([]*extensions.DaemonSet, error) { var selector labels.Selector var daemonSet *extensions.DaemonSet diff --git a/pkg/client/listers/extensions/v1beta1/daemonset_expansion.go b/pkg/client/listers/extensions/v1beta1/daemonset_expansion.go index d3497414e99..b2144df4afc 100644 --- a/pkg/client/listers/extensions/v1beta1/daemonset_expansion.go +++ b/pkg/client/listers/extensions/v1beta1/daemonset_expansion.go @@ -35,8 +35,9 @@ type DaemonSetListerExpansion interface { // DaemonSetNamespaeLister. type DaemonSetNamespaceListerExpansion interface{} -// GetPodDaemonSets returns a list of daemon sets managing a pod. -// Returns an error if and only if no matching daemon sets are found. +// GetPodDaemonSets returns a list of DaemonSets that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching DaemonSets are found. func (s *daemonSetLister) GetPodDaemonSets(pod *v1.Pod) ([]*v1beta1.DaemonSet, error) { var selector labels.Selector var daemonSet *v1beta1.DaemonSet diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index e090ee93fbf..48d963b7951 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -400,8 +400,9 @@ func (r RealRSControl) PatchReplicaSet(namespace, name string, data []byte) erro type PodControlInterface interface { // CreatePods creates new pods according to the spec. CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error - // CreatePodsOnNode creates a new pod according to the spec on the specified node. - CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error + // CreatePodsOnNode creates a new pod according to the spec on the specified node, + // and sets the ControllerRef. + CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error // CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller. CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error // DeletePod deletes the pod identified by podID. @@ -466,11 +467,7 @@ func getPodsPrefix(controllerName string) string { return prefix } -func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { - return r.createPods("", namespace, template, object, nil) -} - -func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { +func validateControllerRef(controllerRef *metav1.OwnerReference) error { if controllerRef == nil { return fmt.Errorf("controllerRef is nil") } @@ -481,16 +478,30 @@ func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template * return fmt.Errorf("controllerRef has empty Kind") } if controllerRef.Controller == nil || *controllerRef.Controller != true { - return fmt.Errorf("controllerRef.Controller is not set") + return fmt.Errorf("controllerRef.Controller is not set to true") } if controllerRef.BlockOwnerDeletion == nil || *controllerRef.BlockOwnerDeletion != true { return fmt.Errorf("controllerRef.BlockOwnerDeletion is not set") } + return nil +} + +func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { + return r.createPods("", namespace, template, object, nil) +} + +func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { + if err := validateControllerRef(controllerRef); err != nil { + return err + } return r.createPods("", namespace, template, controllerObject, controllerRef) } -func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { - return r.createPods(nodeName, namespace, template, object, nil) +func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { + if err := validateControllerRef(controllerRef); err != nil { + return err + } + return r.createPods(nodeName, namespace, template, object, controllerRef) } func (r RealPodControl) PatchPod(namespace, name string, data []byte) error { @@ -613,10 +624,11 @@ func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1. return nil } -func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { +func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { f.Lock() defer f.Unlock() f.Templates = append(f.Templates, *template) + f.ControllerRefs = append(f.ControllerRefs, *controllerRef) if f.Err != nil { return f.Err } diff --git a/pkg/controller/daemon/BUILD b/pkg/controller/daemon/BUILD index 1301d9f1043..d3ed2e45f51 100644 --- a/pkg/controller/daemon/BUILD +++ b/pkg/controller/daemon/BUILD @@ -73,11 +73,13 @@ go_test( "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/util/intstr", + "//vendor:k8s.io/apimachinery/pkg/util/uuid", "//vendor:k8s.io/apiserver/pkg/storage/names", "//vendor:k8s.io/apiserver/pkg/util/feature", "//vendor:k8s.io/client-go/testing", "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/record", + "//vendor:k8s.io/client-go/util/workqueue", ], ) diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index 62c100e4f60..8d86376593c 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -72,6 +72,9 @@ const ( FailedDaemonPodReason = "FailedDaemonPod" ) +// controllerKind contains the schema.GroupVersionKind for this controller type. +var controllerKind = extensions.SchemeGroupVersion.WithKind("DaemonSet") + // DaemonSetsController is responsible for synchronizing DaemonSet objects stored // in the system with actual running pods. type DaemonSetsController struct { @@ -105,13 +108,11 @@ type DaemonSetsController struct { // Added as a member to the struct to allow injection for testing. nodeStoreSynced cache.InformerSynced - lookupCache *controller.MatchingCache - // DaemonSet keys that need to be synced. queue workqueue.RateLimitingInterface } -func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface, lookupCacheSize int) *DaemonSetsController { +func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) *DaemonSetsController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. @@ -141,21 +142,6 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo UpdateFunc: func(old, cur interface{}) { oldDS := old.(*extensions.DaemonSet) curDS := cur.(*extensions.DaemonSet) - // We should invalidate the whole lookup cache if a DS's selector has been updated. - // - // Imagine that you have two RSs: - // * old DS1 - // * new DS2 - // You also have a pod that is attached to DS2 (because it doesn't match DS1 selector). - // Now imagine that you are changing DS1 selector so that it is now matching that pod, - // in such case we must invalidate the whole cache so that pod could be adopted by DS1 - // - // This makes the lookup cache less helpful, but selector update does not happen often, - // so it's not a big problem - if !reflect.DeepEqual(oldDS.Spec.Selector, curDS.Spec.Selector) { - dsc.lookupCache.InvalidateAll() - } - glog.V(4).Infof("Updating daemon set %s", oldDS.Name) dsc.enqueueDaemonSet(curDS) }, @@ -184,7 +170,6 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo dsc.syncHandler = dsc.syncDaemonSet dsc.enqueueDaemonSet = dsc.enqueue - dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize) return dsc } @@ -273,78 +258,57 @@ func (dsc *DaemonSetsController) enqueueDaemonSetAfter(obj interface{}, after ti dsc.queue.AddAfter(key, after) } -func (dsc *DaemonSetsController) getPodDaemonSet(pod *v1.Pod) *extensions.DaemonSet { - // look up in the cache, if cached and the cache is valid, just return cached value - if obj, cached := dsc.lookupCache.GetMatchingObject(pod); cached { - ds, ok := obj.(*extensions.DaemonSet) - if !ok { - // This should not happen - utilruntime.HandleError(fmt.Errorf("lookup cache does not return a DaemonSet object")) - return nil - } - if dsc.isCacheValid(pod, ds) { - return ds - } - } +// getPodDaemonSets returns a list of DaemonSets that potentially match the pod. +func (dsc *DaemonSetsController) getPodDaemonSets(pod *v1.Pod) []*extensions.DaemonSet { sets, err := dsc.dsLister.GetPodDaemonSets(pod) if err != nil { - glog.V(4).Infof("No daemon sets found for pod %v, daemon set controller will avoid syncing", pod.Name) return nil } if len(sets) > 1 { - // More than two items in this list indicates user error. If two daemon - // sets overlap, sort by creation timestamp, subsort by name, then pick - // the first. + // ControllerRef will ensure we don't do anythign crazy, but more than one + // item in this list nevertheless constitutes user error. utilruntime.HandleError(fmt.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels)) - sort.Sort(byCreationTimestamp(sets)) } - - // update lookup cache - dsc.lookupCache.Update(pod, sets[0]) - - return sets[0] -} - -// isCacheValid check if the cache is valid -func (dsc *DaemonSetsController) isCacheValid(pod *v1.Pod, cachedDS *extensions.DaemonSet) bool { - _, err := dsc.dsLister.DaemonSets(cachedDS.Namespace).Get(cachedDS.Name) - // ds has been deleted or updated, cache is invalid - if err != nil || !isDaemonSetMatch(pod, cachedDS) { - return false - } - return true -} - -// isDaemonSetMatch take a Pod and DaemonSet, return whether the Pod and DaemonSet are matching -// TODO(mqliang): This logic is a copy from GetPodDaemonSets(), remove the duplication -func isDaemonSetMatch(pod *v1.Pod, ds *extensions.DaemonSet) bool { - if ds.Namespace != pod.Namespace { - return false - } - selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector) - if err != nil { - err = fmt.Errorf("invalid selector: %v", err) - return false - } - - // If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything. - if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { - return false - } - return true + return sets } func (dsc *DaemonSetsController) addPod(obj interface{}) { pod := obj.(*v1.Pod) - glog.V(4).Infof("Pod %s added.", pod.Name) - if ds := dsc.getPodDaemonSet(pod); ds != nil { - dsKey, err := controller.KeyFunc(ds) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err)) + + if pod.DeletionTimestamp != nil { + // on a restart of the controller manager, it's possible a new pod shows up in a state that + // is already pending deletion. Prevent the pod from being a creation observation. + dsc.deletePod(pod) + return + } + + // If it has a ControllerRef, that's all that matters. + if controllerRef := controller.GetControllerOf(pod); controllerRef != nil { + ds := dsc.resolveControllerRef(pod.Namespace, controllerRef) + if ds == nil { return } + dsKey, err := controller.KeyFunc(ds) + if err != nil { + return + } + glog.V(4).Infof("Pod %s added.", pod.Name) dsc.expectations.CreationObserved(dsKey) dsc.enqueueDaemonSet(ds) + return + } + + // Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync + // them to see if anyone wants to adopt it. + // DO NOT observe creation because no controller should be waiting for an + // orphan. + dss := dsc.getPodDaemonSets(pod) + if len(dss) == 0 { + return + } + glog.V(4).Infof("Orphan Pod %s added.", pod.Name) + for _, ds := range dss { + dsc.enqueueDaemonSet(ds) } } @@ -359,24 +323,44 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) { // Two different versions of the same pod will always have different RVs. return } - glog.V(4).Infof("Pod %s updated.", curPod.Name) changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) - if curDS := dsc.getPodDaemonSet(curPod); curDS != nil { - dsc.enqueueDaemonSet(curDS) + labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) - // See https://github.com/kubernetes/kubernetes/pull/38076 for more details - if changedToReady && curDS.Spec.MinReadySeconds > 0 { - dsc.enqueueDaemonSetAfter(curDS, time.Duration(curDS.Spec.MinReadySeconds)*time.Second) + curControllerRef := controller.GetControllerOf(curPod) + oldControllerRef := controller.GetControllerOf(oldPod) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && oldControllerRef != nil { + // The ControllerRef was changed. Sync the old controller, if any. + if ds := dsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); ds != nil { + dsc.enqueueDaemonSet(ds) } } - // If the labels have not changed, then the daemon set responsible for - // the pod is the same as it was before. In that case we have enqueued the daemon - // set above, and do not have to enqueue the set again. - if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { - // It's ok if both oldDS and curDS are the same, because curDS will set - // the expectations on its run so oldDS will have no effect. - if oldDS := dsc.getPodDaemonSet(oldPod); oldDS != nil { - dsc.enqueueDaemonSet(oldDS) + + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + ds := dsc.resolveControllerRef(curPod.Namespace, curControllerRef) + if ds == nil { + return + } + glog.V(4).Infof("Pod %s updated.", curPod.Name) + dsc.enqueueDaemonSet(ds) + // See https://github.com/kubernetes/kubernetes/pull/38076 for more details + if changedToReady && ds.Spec.MinReadySeconds > 0 { + dsc.enqueueDaemonSetAfter(ds, time.Duration(ds.Spec.MinReadySeconds)*time.Second) + } + return + } + + // Otherwise, it's an orphan. If anything changed, sync matching controllers + // to see if anyone wants to adopt it now. + dss := dsc.getPodDaemonSets(curPod) + if len(dss) == 0 { + return + } + glog.V(4).Infof("Orphan Pod %s updated.", curPod.Name) + if labelChanged || controllerRefChanged { + for _, ds := range dss { + dsc.enqueueDaemonSet(ds) } } } @@ -391,25 +375,32 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { - utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj)) + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj)) return } } + + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return + } + ds := dsc.resolveControllerRef(pod.Namespace, controllerRef) + if ds == nil { + return + } + dsKey, err := controller.KeyFunc(ds) + if err != nil { + return + } glog.V(4).Infof("Pod %s deleted.", pod.Name) - if ds := dsc.getPodDaemonSet(pod); ds != nil { - dsKey, err := controller.KeyFunc(ds) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err)) - return - } - dsc.expectations.DeletionObserved(dsKey) - dsc.enqueueDaemonSet(ds) - } + dsc.expectations.DeletionObserved(dsKey) + dsc.enqueueDaemonSet(ds) } func (dsc *DaemonSetsController) addNode(obj interface{}) { @@ -462,30 +453,62 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) { } // getNodesToDaemonSetPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes. +// This also reconciles ControllerRef by adopting/orphaning. +// Note that returned Pods are pointers to objects in the cache. +// If you want to modify one, you need to deep-copy it first. func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) (map[string][]*v1.Pod, error) { - nodeToDaemonPods := make(map[string][]*v1.Pod) selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector) if err != nil { return nil, err } - daemonPods, err := dsc.podLister.Pods(ds.Namespace).List(selector) + + // List all pods to include those that don't match the selector anymore but + // have a ControllerRef pointing to this controller. + pods, err := dsc.podLister.Pods(ds.Namespace).List(labels.Everything()) if err != nil { - return nodeToDaemonPods, err + return nil, err } - for i := range daemonPods { - // TODO: Do we need to copy here? - daemonPod := &(*daemonPods[i]) - nodeName := daemonPod.Spec.NodeName - nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], daemonPod) + // Use ControllerRefManager to adopt/orphan as needed. + cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind) + claimedPods, err := cm.ClaimPods(pods) + if err != nil { + return nil, err + } + // Group Pods by Node name. + nodeToDaemonPods := make(map[string][]*v1.Pod) + for _, pod := range claimedPods { + nodeName := pod.Spec.NodeName + nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod) } return nodeToDaemonPods, nil } +// resolveControllerRef returns the controller referenced by a ControllerRef, +// or nil if the ControllerRef could not be resolved to a matching controller +// of the corrrect Kind. +func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *extensions.DaemonSet { + // We can't look up by UID, so look up by Name and then verify UID. + // Don't even try to look up by Name if it's the wrong Kind. + if controllerRef.Kind != controllerKind.Kind { + return nil + } + ds, err := dsc.dsLister.DaemonSets(namespace).Get(controllerRef.Name) + if err != nil { + return nil + } + if ds.UID != controllerRef.UID { + // The controller we found with this Name is not the same one that the + // ControllerRef points to. + return nil + } + return ds +} + func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error { - // Find out which nodes are running the daemon pods selected by ds. + // Find out which nodes are running the daemon pods controlled by ds. nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) if err != nil { - return fmt.Errorf("error getting node to daemon pod mapping for daemon set %#v: %v", ds, err) + return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon @@ -586,7 +609,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelet for i := 0; i < createDiff; i++ { go func(ix int) { defer createWait.Done() - if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds); err != nil { + if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, newControllerRef(ds)); err != nil { glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) dsc.expectations.CreationObserved(dsKey) errCh <- err @@ -669,7 +692,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) glog.V(4).Infof("Updating daemon set status") nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) if err != nil { - return fmt.Errorf("error getting node to daemon pod mapping for daemon set %#v: %v", ds, err) + return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } nodeList, err := dsc.nodeLister.List(labels.Everything()) @@ -754,15 +777,17 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { if err != nil { return fmt.Errorf("couldn't get key for object %#v: %v", ds, err) } - dsNeedsSync := dsc.expectations.SatisfiedExpectations(dsKey) - if dsNeedsSync && ds.DeletionTimestamp == nil { - if err := dsc.manage(ds); err != nil { - return err - } + if ds.DeletionTimestamp != nil || !dsc.expectations.SatisfiedExpectations(dsKey) { + // Only update status. + return dsc.updateDaemonSetStatus(ds) } - dsNeedsSync = dsc.expectations.SatisfiedExpectations(dsKey) - if dsNeedsSync && ds.DeletionTimestamp == nil { + if err := dsc.manage(ds); err != nil { + return err + } + + // Process rolling updates if we're ready. + if dsc.expectations.SatisfiedExpectations(dsKey) { switch ds.Spec.UpdateStrategy.Type { case extensions.RollingUpdateDaemonSetStrategyType: err = dsc.rollingUpdate(ds) @@ -857,7 +882,7 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *exten } // ignore pods that belong to the daemonset when taking into account whether // a daemonset should bind to a node. - if pds := dsc.getPodDaemonSet(pod); pds != nil && ds.Name == pds.Name { + if controllerRef := controller.GetControllerOf(pod); controllerRef != nil && controllerRef.UID == ds.UID { continue } pods = append(pods, pod) @@ -961,6 +986,20 @@ func Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorit return len(predicateFails) == 0, predicateFails, nil } +// newControllerRef creates a ControllerRef pointing to the given DaemonSet. +func newControllerRef(ds *extensions.DaemonSet) *metav1.OwnerReference { + blockOwnerDeletion := true + isController := true + return &metav1.OwnerReference{ + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, + Name: ds.Name, + UID: ds.UID, + BlockOwnerDeletion: &blockOwnerDeletion, + Controller: &isController, + } +} + // byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker. type byCreationTimestamp []*extensions.DaemonSet diff --git a/pkg/controller/daemon/daemoncontroller_test.go b/pkg/controller/daemon/daemoncontroller_test.go index bb2abf7d6f2..120a6aa0de9 100644 --- a/pkg/controller/daemon/daemoncontroller_test.go +++ b/pkg/controller/daemon/daemoncontroller_test.go @@ -18,17 +18,22 @@ package daemon import ( "fmt" + "reflect" + "sort" + "strconv" "sync" "testing" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apiserver/pkg/storage/names" utilfeature "k8s.io/apiserver/pkg/util/feature" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/v1" @@ -80,6 +85,7 @@ func newDaemonSet(name string) *extensions.DaemonSet { return &extensions.DaemonSet{ TypeMeta: metav1.TypeMeta{APIVersion: testapi.Extensions.GroupVersion().String()}, ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), Name: name, Namespace: metav1.NamespaceDefault, }, @@ -130,7 +136,7 @@ func addNodes(nodeStore cache.Store, startIndex, numNodes int, label map[string] } } -func newPod(podName string, nodeName string, label map[string]string) *v1.Pod { +func newPod(podName string, nodeName string, label map[string]string, ds *extensions.DaemonSet) *v1.Pod { pod := &v1.Pod{ TypeMeta: metav1.TypeMeta{APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String()}, ObjectMeta: metav1.ObjectMeta{ @@ -152,18 +158,21 @@ func newPod(podName string, nodeName string, label map[string]string) *v1.Pod { }, } pod.Name = names.SimpleNameGenerator.GenerateName(podName) + if ds != nil { + pod.OwnerReferences = []metav1.OwnerReference{*newControllerRef(ds)} + } return pod } -func addPods(podStore cache.Store, nodeName string, label map[string]string, number int) { +func addPods(podStore cache.Store, nodeName string, label map[string]string, ds *extensions.DaemonSet, number int) { for i := 0; i < number; i++ { - podStore.Add(newPod(fmt.Sprintf("%s-", nodeName), nodeName, label)) + podStore.Add(newPod(fmt.Sprintf("%s-", nodeName), nodeName, label, ds)) } } -func addFailedPods(podStore cache.Store, nodeName string, label map[string]string, number int) { +func addFailedPods(podStore cache.Store, nodeName string, label map[string]string, ds *extensions.DaemonSet, number int) { for i := 0; i < number; i++ { - pod := newPod(fmt.Sprintf("%s-", nodeName), nodeName, label) + pod := newPod(fmt.Sprintf("%s-", nodeName), nodeName, label, ds) pod.Status = v1.PodStatus{Phase: v1.PodFailed} podStore.Add(pod) } @@ -183,10 +192,10 @@ func newFakePodControl() *fakePodControl { podIDMap: podIDMap} } -func (f *fakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { +func (f *fakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { f.Lock() defer f.Unlock() - if err := f.FakePodControl.CreatePodsOnNode(nodeName, namespace, template, object); err != nil { + if err := f.FakePodControl.CreatePodsOnNode(nodeName, namespace, template, object, controllerRef); err != nil { return fmt.Errorf("failed to create pod on node %q", nodeName) } @@ -243,7 +252,6 @@ func newTestController(initialObjects ...runtime.Object) (*daemonSetsController, informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Nodes(), clientset, - 0, ) manager.eventRecorder = record.NewFakeRecorder(100) @@ -269,6 +277,22 @@ func validateSyncDaemonSets(t *testing.T, fakePodControl *fakePodControl, expect if len(fakePodControl.DeletePodName) != expectedDeletes { t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.DeletePodName)) } + // Every Pod created should have a ControllerRef. + if got, want := len(fakePodControl.ControllerRefs), expectedCreates; got != want { + t.Errorf("len(ControllerRefs) = %v, want %v", got, want) + } + // Make sure the ControllerRefs are correct. + for _, controllerRef := range fakePodControl.ControllerRefs { + if got, want := controllerRef.APIVersion, "extensions/v1beta1"; got != want { + t.Errorf("controllerRef.APIVersion = %q, want %q", got, want) + } + if got, want := controllerRef.Kind, "DaemonSet"; got != want { + t.Errorf("controllerRef.Kind = %q, want %q", got, want) + } + if controllerRef.Controller == nil || *controllerRef.Controller != true { + t.Errorf("controllerRef.Controller is not set to true") + } + } } func syncAndValidateDaemonSets(t *testing.T, manager *daemonSetsController, ds *extensions.DaemonSet, podControl *fakePodControl, expectedCreates, expectedDeletes int) { @@ -322,6 +346,32 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) { syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0) } +func TestSimpleDaemonSetUpdatesStatusAfterLaunchingPods(t *testing.T) { + manager, podControl, clientset := newTestController() + + var updated *extensions.DaemonSet + clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) { + if action.GetSubresource() != "status" { + return false, nil, nil + } + if u, ok := action.(core.UpdateAction); ok { + updated = u.GetObject().(*extensions.DaemonSet) + } + return false, nil, nil + }) + + ds := newDaemonSet("foo") + manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0) + + // Make sure the single sync() updated Status already for the change made + // during the manage() phase. + if got, want := updated.Status.CurrentNumberScheduled, int32(5); got != want { + t.Errorf("Status.CurrentNumberScheduled = %v, want %v", got, want) + } +} + // DaemonSets should do nothing if there aren't any nodes func TestNoNodesDoesNothing(t *testing.T) { manager, podControl, _ := newTestController() @@ -521,16 +571,17 @@ func TestPortConflictWithSameDaemonPodDoesNotDeletePod(t *testing.T) { manager, podControl, _ := newTestController() node := newNode("port-conflict", nil) manager.nodeStore.Add(node) - manager.podStore.Add(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: simpleDaemonSetLabel, - Namespace: metav1.NamespaceDefault, - }, - Spec: podSpec, - }) ds := newDaemonSet("foo") ds.Spec.Template.Spec = podSpec manager.dsStore.Add(ds) + manager.podStore.Add(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: simpleDaemonSetLabel, + Namespace: metav1.NamespaceDefault, + OwnerReferences: []metav1.OwnerReference{*newControllerRef(ds)}, + }, + Spec: podSpec, + }) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) } @@ -602,13 +653,13 @@ func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) { // Controller should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods. func TestDealsWithExistingPods(t *testing.T) { manager, podControl, _ := newTestController() - addNodes(manager.nodeStore, 0, 5, nil) - addPods(manager.podStore, "node-1", simpleDaemonSetLabel, 1) - addPods(manager.podStore, "node-2", simpleDaemonSetLabel, 2) - addPods(manager.podStore, "node-3", simpleDaemonSetLabel, 5) - addPods(manager.podStore, "node-4", simpleDaemonSetLabel2, 2) ds := newDaemonSet("foo") manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + addPods(manager.podStore, "node-1", simpleDaemonSetLabel, ds, 1) + addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 2) + addPods(manager.podStore, "node-3", simpleDaemonSetLabel, ds, 5) + addPods(manager.podStore, "node-4", simpleDaemonSetLabel2, ds, 2) syncAndValidateDaemonSets(t, manager, ds, podControl, 2, 5) } @@ -626,34 +677,34 @@ func TestSelectorDaemonLaunchesPods(t *testing.T) { // Daemon with node selector should delete pods from nodes that do not satisfy selector. func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) { manager, podControl, _ := newTestController() + ds := newDaemonSet("foo") + ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel + manager.dsStore.Add(ds) addNodes(manager.nodeStore, 0, 5, nil) addNodes(manager.nodeStore, 5, 5, simpleNodeLabel) - addPods(manager.podStore, "node-0", simpleDaemonSetLabel2, 2) - addPods(manager.podStore, "node-1", simpleDaemonSetLabel, 3) - addPods(manager.podStore, "node-1", simpleDaemonSetLabel2, 1) - addPods(manager.podStore, "node-4", simpleDaemonSetLabel, 1) - daemon := newDaemonSet("foo") - daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel - manager.dsStore.Add(daemon) - syncAndValidateDaemonSets(t, manager, daemon, podControl, 5, 4) + addPods(manager.podStore, "node-0", simpleDaemonSetLabel2, ds, 2) + addPods(manager.podStore, "node-1", simpleDaemonSetLabel, ds, 3) + addPods(manager.podStore, "node-1", simpleDaemonSetLabel2, ds, 1) + addPods(manager.podStore, "node-4", simpleDaemonSetLabel, ds, 1) + syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 4) } // DaemonSet with node selector should launch pods on nodes matching selector, but also deal with existing pods on nodes. func TestSelectorDaemonDealsWithExistingPods(t *testing.T) { manager, podControl, _ := newTestController() - addNodes(manager.nodeStore, 0, 5, nil) - addNodes(manager.nodeStore, 5, 5, simpleNodeLabel) - addPods(manager.podStore, "node-0", simpleDaemonSetLabel, 1) - addPods(manager.podStore, "node-1", simpleDaemonSetLabel, 3) - addPods(manager.podStore, "node-1", simpleDaemonSetLabel2, 2) - addPods(manager.podStore, "node-2", simpleDaemonSetLabel, 4) - addPods(manager.podStore, "node-6", simpleDaemonSetLabel, 13) - addPods(manager.podStore, "node-7", simpleDaemonSetLabel2, 4) - addPods(manager.podStore, "node-9", simpleDaemonSetLabel, 1) - addPods(manager.podStore, "node-9", simpleDaemonSetLabel2, 1) ds := newDaemonSet("foo") ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + addNodes(manager.nodeStore, 5, 5, simpleNodeLabel) + addPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, 1) + addPods(manager.podStore, "node-1", simpleDaemonSetLabel, ds, 3) + addPods(manager.podStore, "node-1", simpleDaemonSetLabel2, ds, 2) + addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 4) + addPods(manager.podStore, "node-6", simpleDaemonSetLabel, ds, 13) + addPods(manager.podStore, "node-7", simpleDaemonSetLabel2, ds, 4) + addPods(manager.podStore, "node-9", simpleDaemonSetLabel, ds, 1) + addPods(manager.podStore, "node-9", simpleDaemonSetLabel2, ds, 1) syncAndValidateDaemonSets(t, manager, ds, podControl, 3, 20) } @@ -740,7 +791,7 @@ func TestNodeAffinityDaemonLaunchesPods(t *testing.T) { } func TestNumberReadyStatus(t *testing.T) { - daemon := newDaemonSet("foo") + ds := newDaemonSet("foo") manager, podControl, clientset := newTestController() var updated *extensions.DaemonSet clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) { @@ -753,31 +804,31 @@ func TestNumberReadyStatus(t *testing.T) { return false, nil, nil }) addNodes(manager.nodeStore, 0, 2, simpleNodeLabel) - addPods(manager.podStore, "node-0", simpleDaemonSetLabel, 1) - addPods(manager.podStore, "node-1", simpleDaemonSetLabel, 1) - manager.dsStore.Add(daemon) + addPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, 1) + addPods(manager.podStore, "node-1", simpleDaemonSetLabel, ds, 1) + manager.dsStore.Add(ds) - syncAndValidateDaemonSets(t, manager, daemon, podControl, 0, 0) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) if updated.Status.NumberReady != 0 { t.Errorf("Wrong daemon %s status: %v", updated.Name, updated.Status) } - selector, _ := metav1.LabelSelectorAsSelector(daemon.Spec.Selector) - daemonPods, _ := manager.podLister.Pods(daemon.Namespace).List(selector) + selector, _ := metav1.LabelSelectorAsSelector(ds.Spec.Selector) + daemonPods, _ := manager.podLister.Pods(ds.Namespace).List(selector) for _, pod := range daemonPods { condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue} pod.Status.Conditions = append(pod.Status.Conditions, condition) } - syncAndValidateDaemonSets(t, manager, daemon, podControl, 0, 0) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) if updated.Status.NumberReady != 2 { t.Errorf("Wrong daemon %s status: %v", updated.Name, updated.Status) } } func TestObservedGeneration(t *testing.T) { - daemon := newDaemonSet("foo") - daemon.Generation = 1 + ds := newDaemonSet("foo") + ds.Generation = 1 manager, podControl, clientset := newTestController() var updated *extensions.DaemonSet clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) { @@ -791,12 +842,12 @@ func TestObservedGeneration(t *testing.T) { }) addNodes(manager.nodeStore, 0, 1, simpleNodeLabel) - addPods(manager.podStore, "node-0", simpleDaemonSetLabel, 1) - manager.dsStore.Add(daemon) + addPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, 1) + manager.dsStore.Add(ds) - syncAndValidateDaemonSets(t, manager, daemon, podControl, 0, 0) - if updated.Status.ObservedGeneration != daemon.Generation { - t.Errorf("Wrong ObservedGeneration for daemon %s in status. Expected %d, got %d", updated.Name, daemon.Generation, updated.Status.ObservedGeneration) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) + if updated.Status.ObservedGeneration != ds.Generation { + t.Errorf("Wrong ObservedGeneration for daemon %s in status. Expected %d, got %d", updated.Name, ds.Generation, updated.Status.ObservedGeneration) } } @@ -816,11 +867,11 @@ func TestDaemonKillFailedPods(t *testing.T) { for _, test := range tests { t.Logf("test case: %s\n", test.test) manager, podControl, _ := newTestController() - addNodes(manager.nodeStore, 0, 1, nil) - addFailedPods(manager.podStore, "node-0", simpleDaemonSetLabel, test.numFailedPods) - addPods(manager.podStore, "node-0", simpleDaemonSetLabel, test.numNormalPods) ds := newDaemonSet("foo") manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 1, nil) + addFailedPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, test.numFailedPods) + addPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, test.numNormalPods) syncAndValidateDaemonSets(t, manager, ds, podControl, test.expectedCreates, test.expectedDeletes) } } @@ -1167,3 +1218,300 @@ func TestUpdateNode(t *testing.T) { } } } + +func TestGetNodesToDaemonPods(t *testing.T) { + manager, _, _ := newTestController() + ds := newDaemonSet("foo") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds) + manager.dsStore.Add(ds2) + addNodes(manager.nodeStore, 0, 2, nil) + + // These pods should be returned. + wantedPods := []*v1.Pod{ + newPod("matching-owned-0-", "node-0", simpleDaemonSetLabel, ds), + newPod("matching-orphan-0-", "node-0", simpleDaemonSetLabel, nil), + newPod("matching-owned-1-", "node-1", simpleDaemonSetLabel, ds), + newPod("matching-orphan-1-", "node-1", simpleDaemonSetLabel, nil), + } + failedPod := newPod("matching-owned-failed-pod-1-", "node-1", simpleDaemonSetLabel, ds) + failedPod.Status = v1.PodStatus{Phase: v1.PodFailed} + wantedPods = append(wantedPods, failedPod) + for _, pod := range wantedPods { + manager.podStore.Add(pod) + } + + // These pods should be ignored. + ignoredPods := []*v1.Pod{ + newPod("non-matching-owned-0-", "node-0", simpleDaemonSetLabel2, ds), + newPod("non-matching-orphan-1-", "node-1", simpleDaemonSetLabel2, nil), + newPod("matching-owned-by-other-0-", "node-0", simpleDaemonSetLabel, ds2), + } + for _, pod := range ignoredPods { + manager.podStore.Add(pod) + } + + nodesToDaemonPods, err := manager.getNodesToDaemonPods(ds) + if err != nil { + t.Fatalf("getNodesToDaemonPods() error: %v", err) + } + gotPods := map[string]bool{} + for node, pods := range nodesToDaemonPods { + for _, pod := range pods { + if pod.Spec.NodeName != node { + t.Errorf("pod %v grouped into %v but belongs in %v", pod.Name, node, pod.Spec.NodeName) + } + gotPods[pod.Name] = true + } + } + for _, pod := range wantedPods { + if !gotPods[pod.Name] { + t.Errorf("expected pod %v but didn't get it", pod.Name) + } + delete(gotPods, pod.Name) + } + for podName := range gotPods { + t.Errorf("unexpected pod %v was returned", podName) + } +} + +func TestAddPod(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1) + manager.addPod(pod1) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(ds1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2) + manager.addPod(pod2) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(ds2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestAddPodOrphan(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + ds3 := newDaemonSet("foo3") + ds3.Spec.Selector.MatchLabels = simpleDaemonSetLabel2 + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + manager.dsStore.Add(ds3) + + // Make pod an orphan. Expect matching sets to be queued. + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil) + manager.addPod(pod) + if got, want := manager.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + if got, want := getQueuedKeys(manager.queue), []string{"default/foo1", "default/foo2"}; !reflect.DeepEqual(got, want) { + t.Errorf("getQueuedKeys() = %v, want %v", got, want) + } +} + +func TestUpdatePod(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1) + prev := *pod1 + bumpResourceVersion(pod1) + manager.updatePod(&prev, pod1) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(ds1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2) + prev = *pod2 + bumpResourceVersion(pod2) + manager.updatePod(&prev, pod2) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(ds2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestUpdatePodOrphanSameLabels(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil) + prev := *pod + bumpResourceVersion(pod) + manager.updatePod(&prev, pod) + if got, want := manager.queue.Len(), 0; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestUpdatePodOrphanWithNewLabels(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil) + prev := *pod + prev.Labels = map[string]string{"foo2": "bar2"} + bumpResourceVersion(pod) + manager.updatePod(&prev, pod) + if got, want := manager.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + if got, want := getQueuedKeys(manager.queue), []string{"default/foo1", "default/foo2"}; !reflect.DeepEqual(got, want) { + t.Errorf("getQueuedKeys() = %v, want %v", got, want) + } +} + +func TestUpdatePodChangeControllerRef(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1) + prev := *pod + prev.OwnerReferences = []metav1.OwnerReference{*newControllerRef(ds2)} + bumpResourceVersion(pod) + manager.updatePod(&prev, pod) + if got, want := manager.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestUpdatePodControllerRefRemoved(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1) + prev := *pod + pod.OwnerReferences = nil + bumpResourceVersion(pod) + manager.updatePod(&prev, pod) + if got, want := manager.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestDeletePod(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1) + manager.deletePod(pod1) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(ds1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2) + manager.deletePod(pod2) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(ds2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestDeletePodOrphan(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + ds3 := newDaemonSet("foo3") + ds3.Spec.Selector.MatchLabels = simpleDaemonSetLabel2 + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + manager.dsStore.Add(ds3) + + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil) + manager.deletePod(pod) + if got, want := manager.queue.Len(), 0; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func bumpResourceVersion(obj metav1.Object) { + ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32) + obj.SetResourceVersion(strconv.FormatInt(ver+1, 10)) +} + +// getQueuedKeys returns a sorted list of keys in the queue. +// It can be used to quickly check that multiple keys are in there. +func getQueuedKeys(queue workqueue.RateLimitingInterface) []string { + var keys []string + count := queue.Len() + for i := 0; i < count; i++ { + key, done := queue.Get() + if done { + return keys + } + keys = append(keys, key.(string)) + } + sort.Strings(keys) + return keys +} diff --git a/pkg/controller/daemon/update.go b/pkg/controller/daemon/update.go index f13afba8330..56645b4eb14 100644 --- a/pkg/controller/daemon/update.go +++ b/pkg/controller/daemon/update.go @@ -32,10 +32,13 @@ import ( // rollingUpdate deletes old daemon set pods making sure that no more than // ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet) error { - newPods, oldPods, err := dsc.getAllDaemonSetPods(ds) - allPods := append(oldPods, newPods...) + nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) + if err != nil { + return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) + } - maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, allPods) + _, oldPods, err := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods) + maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeToDaemonPods) if err != nil { return fmt.Errorf("Couldn't get unavailable numbers: %v", err) } @@ -67,29 +70,23 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet) error { return utilerrors.NewAggregate(errors) } -func (dsc *DaemonSetsController) getAllDaemonSetPods(ds *extensions.DaemonSet) ([]*v1.Pod, []*v1.Pod, error) { +func (dsc *DaemonSetsController) getAllDaemonSetPods(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) ([]*v1.Pod, []*v1.Pod, error) { var newPods []*v1.Pod var oldPods []*v1.Pod - selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector) - if err != nil { - return newPods, oldPods, err - } - daemonPods, err := dsc.podLister.Pods(ds.Namespace).List(selector) - if err != nil { - return newPods, oldPods, fmt.Errorf("Couldn't get list of pods for daemon set %#v: %v", ds, err) - } - for _, pod := range daemonPods { - if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod) { - newPods = append(newPods, pod) - } else { - oldPods = append(oldPods, pod) + for _, pods := range nodeToDaemonPods { + for _, pod := range pods { + if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod) { + newPods = append(newPods, pod) + } else { + oldPods = append(oldPods, pod) + } } } return newPods, oldPods, nil } -func (dsc *DaemonSetsController) getUnavailableNumbers(ds *extensions.DaemonSet, allPods []*v1.Pod) (int, int, error) { +func (dsc *DaemonSetsController) getUnavailableNumbers(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) (int, int, error) { glog.V(4).Infof("Getting unavailable numbers") // TODO: get nodeList once in syncDaemonSet and pass it to other functions nodeList, err := dsc.nodeLister.List(labels.Everything()) @@ -97,11 +94,6 @@ func (dsc *DaemonSetsController) getUnavailableNumbers(ds *extensions.DaemonSet, return -1, -1, fmt.Errorf("couldn't get list of nodes during rolling update of daemon set %#v: %v", ds, err) } - nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) - if err != nil { - return -1, -1, fmt.Errorf("couldn't get node to daemon pods mapping for daemon set %#v: %v", ds, err) - } - var numUnavailable, desiredNumberScheduled int for i := range nodeList { node := nodeList[i] diff --git a/pkg/registry/extensions/daemonset/BUILD b/pkg/registry/extensions/daemonset/BUILD index 580a1a9fa10..d523cf24486 100644 --- a/pkg/registry/extensions/daemonset/BUILD +++ b/pkg/registry/extensions/daemonset/BUILD @@ -25,6 +25,7 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/util/validation/field", "//vendor:k8s.io/apiserver/pkg/endpoints/request", "//vendor:k8s.io/apiserver/pkg/registry/generic", + "//vendor:k8s.io/apiserver/pkg/registry/rest", "//vendor:k8s.io/apiserver/pkg/storage", "//vendor:k8s.io/apiserver/pkg/storage/names", ], @@ -40,6 +41,7 @@ go_test( "//pkg/api/testapi:go_default_library", "//pkg/api/testing:go_default_library", "//pkg/apis/extensions:go_default_library", + "//vendor:k8s.io/apiserver/pkg/registry/rest", ], ) diff --git a/pkg/registry/extensions/daemonset/strategy.go b/pkg/registry/extensions/daemonset/strategy.go index 6d02f60e725..2e508bdb998 100644 --- a/pkg/registry/extensions/daemonset/strategy.go +++ b/pkg/registry/extensions/daemonset/strategy.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/names" "k8s.io/kubernetes/pkg/api" @@ -42,6 +43,12 @@ type daemonSetStrategy struct { // Strategy is the default logic that applies when creating and updating DaemonSet objects. var Strategy = daemonSetStrategy{api.Scheme, names.SimpleNameGenerator} +// DefaultGarbageCollectionPolicy returns Orphan because that was the default +// behavior before the server-side garbage collection was implemented. +func (daemonSetStrategy) DefaultGarbageCollectionPolicy() rest.GarbageCollectionPolicy { + return rest.OrphanDependents +} + // NamespaceScoped returns true because all DaemonSets need to be within a namespace. func (daemonSetStrategy) NamespaceScoped() bool { return true diff --git a/pkg/registry/extensions/daemonset/strategy_test.go b/pkg/registry/extensions/daemonset/strategy_test.go index 306eccd307a..0590c76473a 100644 --- a/pkg/registry/extensions/daemonset/strategy_test.go +++ b/pkg/registry/extensions/daemonset/strategy_test.go @@ -19,6 +19,7 @@ package daemonset import ( "testing" + "k8s.io/apiserver/pkg/registry/rest" _ "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" apitesting "k8s.io/kubernetes/pkg/api/testing" @@ -33,3 +34,12 @@ func TestSelectableFieldLabelConversions(t *testing.T) { nil, ) } + +func TestDefaultGarbageCollectionPolicy(t *testing.T) { + // Make sure we correctly implement the interface. + // Otherwise a typo could silently change the default. + var gcds rest.GarbageCollectionDeleteStrategy = Strategy + if got, want := gcds.DefaultGarbageCollectionPolicy(), rest.OrphanDependents; got != want { + t.Errorf("DefaultGarbageCollectionPolicy() = %#v, want %#v", got, want) + } +} diff --git a/staging/src/k8s.io/client-go/listers/extensions/v1beta1/daemonset_expansion.go b/staging/src/k8s.io/client-go/listers/extensions/v1beta1/daemonset_expansion.go index dc17c105cf8..019cdc40a8a 100644 --- a/staging/src/k8s.io/client-go/listers/extensions/v1beta1/daemonset_expansion.go +++ b/staging/src/k8s.io/client-go/listers/extensions/v1beta1/daemonset_expansion.go @@ -35,8 +35,9 @@ type DaemonSetListerExpansion interface { // DaemonSetNamespaeLister. type DaemonSetNamespaceListerExpansion interface{} -// GetPodDaemonSets returns a list of daemon sets managing a pod. -// Returns an error if and only if no matching daemon sets are found. +// GetPodDaemonSets returns a list of DaemonSets that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching DaemonSets are found. func (s *daemonSetLister) GetPodDaemonSets(pod *v1.Pod) ([]*v1beta1.DaemonSet, error) { var selector labels.Selector var daemonSet *v1beta1.DaemonSet