From 8f3a56f582b29af6b295d9e0a0155d9ddb161e13 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Sat, 25 Feb 2017 10:56:58 -0800 Subject: [PATCH 01/10] DaemonSet: Add ControllerRef on all created Pods. --- pkg/controller/controller_utils.go | 34 +++++++++++++------ pkg/controller/daemon/daemoncontroller.go | 13 ++++++- .../daemon/daemoncontroller_test.go | 20 +++++++++-- 3 files changed, 53 insertions(+), 14 deletions(-) diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index e090ee93fbf..48d963b7951 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -400,8 +400,9 @@ func (r RealRSControl) PatchReplicaSet(namespace, name string, data []byte) erro type PodControlInterface interface { // CreatePods creates new pods according to the spec. CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error - // CreatePodsOnNode creates a new pod according to the spec on the specified node. - CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error + // CreatePodsOnNode creates a new pod according to the spec on the specified node, + // and sets the ControllerRef. + CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error // CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller. CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error // DeletePod deletes the pod identified by podID. @@ -466,11 +467,7 @@ func getPodsPrefix(controllerName string) string { return prefix } -func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { - return r.createPods("", namespace, template, object, nil) -} - -func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { +func validateControllerRef(controllerRef *metav1.OwnerReference) error { if controllerRef == nil { return fmt.Errorf("controllerRef is nil") } @@ -481,16 +478,30 @@ func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template * return fmt.Errorf("controllerRef has empty Kind") } if controllerRef.Controller == nil || *controllerRef.Controller != true { - return fmt.Errorf("controllerRef.Controller is not set") + return fmt.Errorf("controllerRef.Controller is not set to true") } if controllerRef.BlockOwnerDeletion == nil || *controllerRef.BlockOwnerDeletion != true { return fmt.Errorf("controllerRef.BlockOwnerDeletion is not set") } + return nil +} + +func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { + return r.createPods("", namespace, template, object, nil) +} + +func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { + if err := validateControllerRef(controllerRef); err != nil { + return err + } return r.createPods("", namespace, template, controllerObject, controllerRef) } -func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { - return r.createPods(nodeName, namespace, template, object, nil) +func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { + if err := validateControllerRef(controllerRef); err != nil { + return err + } + return r.createPods(nodeName, namespace, template, object, controllerRef) } func (r RealPodControl) PatchPod(namespace, name string, data []byte) error { @@ -613,10 +624,11 @@ func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1. return nil } -func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { +func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { f.Lock() defer f.Unlock() f.Templates = append(f.Templates, *template) + f.ControllerRefs = append(f.ControllerRefs, *controllerRef) if f.Err != nil { return f.Err } diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index 62c100e4f60..8238822a262 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -72,6 +72,9 @@ const ( FailedDaemonPodReason = "FailedDaemonPod" ) +// controllerKind contains the schema.GroupVersionKind for this controller type. +var controllerKind = extensions.SchemeGroupVersion.WithKind("DaemonSet") + // DaemonSetsController is responsible for synchronizing DaemonSet objects stored // in the system with actual running pods. type DaemonSetsController struct { @@ -586,7 +589,15 @@ func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelet for i := 0; i < createDiff; i++ { go func(ix int) { defer createWait.Done() - if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds); err != nil { + isController := true + controllerRef := &metav1.OwnerReference{ + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, + Name: ds.Name, + UID: ds.UID, + Controller: &isController, + } + if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, controllerRef); err != nil { glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) dsc.expectations.CreationObserved(dsKey) errCh <- err diff --git a/pkg/controller/daemon/daemoncontroller_test.go b/pkg/controller/daemon/daemoncontroller_test.go index bb2abf7d6f2..2b8397d2271 100644 --- a/pkg/controller/daemon/daemoncontroller_test.go +++ b/pkg/controller/daemon/daemoncontroller_test.go @@ -183,10 +183,10 @@ func newFakePodControl() *fakePodControl { podIDMap: podIDMap} } -func (f *fakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { +func (f *fakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { f.Lock() defer f.Unlock() - if err := f.FakePodControl.CreatePodsOnNode(nodeName, namespace, template, object); err != nil { + if err := f.FakePodControl.CreatePodsOnNode(nodeName, namespace, template, object, controllerRef); err != nil { return fmt.Errorf("failed to create pod on node %q", nodeName) } @@ -269,6 +269,22 @@ func validateSyncDaemonSets(t *testing.T, fakePodControl *fakePodControl, expect if len(fakePodControl.DeletePodName) != expectedDeletes { t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.DeletePodName)) } + // Every Pod created should have a ControllerRef. + if got, want := len(fakePodControl.ControllerRefs), expectedCreates; got != want { + t.Errorf("len(ControllerRefs) = %v, want %v", got, want) + } + // Make sure the ControllerRefs are correct. + for _, controllerRef := range fakePodControl.ControllerRefs { + if got, want := controllerRef.APIVersion, "extensions/v1beta1"; got != want { + t.Errorf("controllerRef.APIVersion = %q, want %q", got, want) + } + if got, want := controllerRef.Kind, "DaemonSet"; got != want { + t.Errorf("controllerRef.Kind = %q, want %q", got, want) + } + if controllerRef.Controller == nil || *controllerRef.Controller != true { + t.Errorf("controllerRef.Controller is not set to true") + } + } } func syncAndValidateDaemonSets(t *testing.T, manager *daemonSetsController, ds *extensions.DaemonSet, podControl *fakePodControl, expectedCreates, expectedDeletes int) { From 2217363845b07b155e7e095cca1b4915ce5da853 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Sat, 25 Feb 2017 11:05:24 -0800 Subject: [PATCH 02/10] DaemonSet: Set DefaultGarbageCollectionPolicy to OrphanDependents. Now that DaemonSet adds ControllerRef to Pods it creates, we need to set this default so legacy behavior is maintained. --- pkg/registry/extensions/daemonset/BUILD | 2 ++ pkg/registry/extensions/daemonset/strategy.go | 7 +++++++ pkg/registry/extensions/daemonset/strategy_test.go | 10 ++++++++++ 3 files changed, 19 insertions(+) diff --git a/pkg/registry/extensions/daemonset/BUILD b/pkg/registry/extensions/daemonset/BUILD index 580a1a9fa10..d523cf24486 100644 --- a/pkg/registry/extensions/daemonset/BUILD +++ b/pkg/registry/extensions/daemonset/BUILD @@ -25,6 +25,7 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/util/validation/field", "//vendor:k8s.io/apiserver/pkg/endpoints/request", "//vendor:k8s.io/apiserver/pkg/registry/generic", + "//vendor:k8s.io/apiserver/pkg/registry/rest", "//vendor:k8s.io/apiserver/pkg/storage", "//vendor:k8s.io/apiserver/pkg/storage/names", ], @@ -40,6 +41,7 @@ go_test( "//pkg/api/testapi:go_default_library", "//pkg/api/testing:go_default_library", "//pkg/apis/extensions:go_default_library", + "//vendor:k8s.io/apiserver/pkg/registry/rest", ], ) diff --git a/pkg/registry/extensions/daemonset/strategy.go b/pkg/registry/extensions/daemonset/strategy.go index 6d02f60e725..2e508bdb998 100644 --- a/pkg/registry/extensions/daemonset/strategy.go +++ b/pkg/registry/extensions/daemonset/strategy.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/names" "k8s.io/kubernetes/pkg/api" @@ -42,6 +43,12 @@ type daemonSetStrategy struct { // Strategy is the default logic that applies when creating and updating DaemonSet objects. var Strategy = daemonSetStrategy{api.Scheme, names.SimpleNameGenerator} +// DefaultGarbageCollectionPolicy returns Orphan because that was the default +// behavior before the server-side garbage collection was implemented. +func (daemonSetStrategy) DefaultGarbageCollectionPolicy() rest.GarbageCollectionPolicy { + return rest.OrphanDependents +} + // NamespaceScoped returns true because all DaemonSets need to be within a namespace. func (daemonSetStrategy) NamespaceScoped() bool { return true diff --git a/pkg/registry/extensions/daemonset/strategy_test.go b/pkg/registry/extensions/daemonset/strategy_test.go index 306eccd307a..0590c76473a 100644 --- a/pkg/registry/extensions/daemonset/strategy_test.go +++ b/pkg/registry/extensions/daemonset/strategy_test.go @@ -19,6 +19,7 @@ package daemonset import ( "testing" + "k8s.io/apiserver/pkg/registry/rest" _ "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" apitesting "k8s.io/kubernetes/pkg/api/testing" @@ -33,3 +34,12 @@ func TestSelectableFieldLabelConversions(t *testing.T) { nil, ) } + +func TestDefaultGarbageCollectionPolicy(t *testing.T) { + // Make sure we correctly implement the interface. + // Otherwise a typo could silently change the default. + var gcds rest.GarbageCollectionDeleteStrategy = Strategy + if got, want := gcds.DefaultGarbageCollectionPolicy(), rest.OrphanDependents; got != want { + t.Errorf("DefaultGarbageCollectionPolicy() = %#v, want %#v", got, want) + } +} From 421e0bbd83ab234ce1e99f898d8898600342addb Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Sat, 25 Feb 2017 16:22:54 -0800 Subject: [PATCH 03/10] DaemonSet: Use ControllerRefManager to adopt/orphan. --- pkg/controller/daemon/BUILD | 1 + pkg/controller/daemon/daemoncontroller.go | 75 +++++---- .../daemon/daemoncontroller_test.go | 153 ++++++++++++------ pkg/controller/daemon/update.go | 37 ++--- 4 files changed, 163 insertions(+), 103 deletions(-) diff --git a/pkg/controller/daemon/BUILD b/pkg/controller/daemon/BUILD index 1301d9f1043..3ca29d93d4d 100644 --- a/pkg/controller/daemon/BUILD +++ b/pkg/controller/daemon/BUILD @@ -73,6 +73,7 @@ go_test( "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/util/intstr", + "//vendor:k8s.io/apimachinery/pkg/util/uuid", "//vendor:k8s.io/apiserver/pkg/storage/names", "//vendor:k8s.io/apiserver/pkg/util/feature", "//vendor:k8s.io/client-go/testing", diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index 8238822a262..32eb8778758 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -465,32 +465,37 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) { } // getNodesToDaemonSetPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes. +// This also reconciles ControllerRef by adopting/orphaning. +// Note that returned Pods are pointers to objects in the cache. +// If you want to modify one, you need to deep-copy it first. func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) (map[string][]*v1.Pod, error) { - nodeToDaemonPods := make(map[string][]*v1.Pod) selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector) if err != nil { return nil, err } - daemonPods, err := dsc.podLister.Pods(ds.Namespace).List(selector) + + // List all pods to include those that don't match the selector anymore but + // have a ControllerRef pointing to this controller. + pods, err := dsc.podLister.Pods(ds.Namespace).List(labels.Everything()) if err != nil { - return nodeToDaemonPods, err + return nil, err } - for i := range daemonPods { - // TODO: Do we need to copy here? - daemonPod := &(*daemonPods[i]) - nodeName := daemonPod.Spec.NodeName - nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], daemonPod) + // Use ControllerRefManager to adopt/orphan as needed. + cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, ControllerKind) + claimedPods, err := cm.ClaimPods(pods) + if err != nil { + return nil, err + } + // Group Pods by Node name. + nodeToDaemonPods := make(map[string][]*v1.Pod) + for _, pod := range claimedPods { + nodeName := pod.Spec.NodeName + nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod) } return nodeToDaemonPods, nil } -func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error { - // Find out which nodes are running the daemon pods selected by ds. - nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) - if err != nil { - return fmt.Errorf("error getting node to daemon pod mapping for daemon set %#v: %v", ds, err) - } - +func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) error { // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node. nodeList, err := dsc.nodeLister.List(labels.Everything()) @@ -589,15 +594,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelet for i := 0; i < createDiff; i++ { go func(ix int) { defer createWait.Done() - isController := true - controllerRef := &metav1.OwnerReference{ - APIVersion: controllerKind.GroupVersion().String(), - Kind: controllerKind.Kind, - Name: ds.Name, - UID: ds.UID, - Controller: &isController, - } - if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, controllerRef); err != nil { + if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, newControllerRef(ds)); err != nil { glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) dsc.expectations.CreationObserved(dsKey) errCh <- err @@ -676,12 +673,8 @@ func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds return updateErr } -func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) error { +func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) error { glog.V(4).Infof("Updating daemon set status") - nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) - if err != nil { - return fmt.Errorf("error getting node to daemon pod mapping for daemon set %#v: %v", ds, err) - } nodeList, err := dsc.nodeLister.List(labels.Everything()) if err != nil { @@ -758,6 +751,12 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { return nil } + // Find out which nodes are running the daemon pods controlled by ds. + nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) + if err != nil { + return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) + } + // Don't process a daemon set until all its creations and deletions have been processed. // For example if daemon set foo asked for 3 new daemon pods in the previous call to manage, // then we do not want to call manage on foo until the daemon pods have been created. @@ -767,7 +766,7 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { } dsNeedsSync := dsc.expectations.SatisfiedExpectations(dsKey) if dsNeedsSync && ds.DeletionTimestamp == nil { - if err := dsc.manage(ds); err != nil { + if err := dsc.manage(ds, nodeToDaemonPods); err != nil { return err } } @@ -776,14 +775,14 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { if dsNeedsSync && ds.DeletionTimestamp == nil { switch ds.Spec.UpdateStrategy.Type { case extensions.RollingUpdateDaemonSetStrategyType: - err = dsc.rollingUpdate(ds) + err = dsc.rollingUpdate(ds, nodeToDaemonPods) } if err != nil { return err } } - return dsc.updateDaemonSetStatus(ds) + return dsc.updateDaemonSetStatus(ds, nodeToDaemonPods) } // nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a @@ -972,6 +971,18 @@ func Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorit return len(predicateFails) == 0, predicateFails, nil } +// newControllerRef creates a ControllerRef pointing to the given DaemonSet. +func newControllerRef(ds *extensions.DaemonSet) *metav1.OwnerReference { + isController := true + return &metav1.OwnerReference{ + APIVersion: ControllerKind.GroupVersion().String(), + Kind: ControllerKind.Kind, + Name: ds.Name, + UID: ds.UID, + Controller: &isController, + } +} + // byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker. type byCreationTimestamp []*extensions.DaemonSet diff --git a/pkg/controller/daemon/daemoncontroller_test.go b/pkg/controller/daemon/daemoncontroller_test.go index 2b8397d2271..ea62750ae50 100644 --- a/pkg/controller/daemon/daemoncontroller_test.go +++ b/pkg/controller/daemon/daemoncontroller_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apiserver/pkg/storage/names" utilfeature "k8s.io/apiserver/pkg/util/feature" core "k8s.io/client-go/testing" @@ -80,6 +81,7 @@ func newDaemonSet(name string) *extensions.DaemonSet { return &extensions.DaemonSet{ TypeMeta: metav1.TypeMeta{APIVersion: testapi.Extensions.GroupVersion().String()}, ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), Name: name, Namespace: metav1.NamespaceDefault, }, @@ -130,7 +132,7 @@ func addNodes(nodeStore cache.Store, startIndex, numNodes int, label map[string] } } -func newPod(podName string, nodeName string, label map[string]string) *v1.Pod { +func newPod(podName string, nodeName string, label map[string]string, ds *extensions.DaemonSet) *v1.Pod { pod := &v1.Pod{ TypeMeta: metav1.TypeMeta{APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String()}, ObjectMeta: metav1.ObjectMeta{ @@ -152,18 +154,21 @@ func newPod(podName string, nodeName string, label map[string]string) *v1.Pod { }, } pod.Name = names.SimpleNameGenerator.GenerateName(podName) + if ds != nil { + pod.OwnerReferences = []metav1.OwnerReference{*newControllerRef(ds)} + } return pod } -func addPods(podStore cache.Store, nodeName string, label map[string]string, number int) { +func addPods(podStore cache.Store, nodeName string, label map[string]string, ds *extensions.DaemonSet, number int) { for i := 0; i < number; i++ { - podStore.Add(newPod(fmt.Sprintf("%s-", nodeName), nodeName, label)) + podStore.Add(newPod(fmt.Sprintf("%s-", nodeName), nodeName, label, ds)) } } -func addFailedPods(podStore cache.Store, nodeName string, label map[string]string, number int) { +func addFailedPods(podStore cache.Store, nodeName string, label map[string]string, ds *extensions.DaemonSet, number int) { for i := 0; i < number; i++ { - pod := newPod(fmt.Sprintf("%s-", nodeName), nodeName, label) + pod := newPod(fmt.Sprintf("%s-", nodeName), nodeName, label, ds) pod.Status = v1.PodStatus{Phase: v1.PodFailed} podStore.Add(pod) } @@ -618,13 +623,13 @@ func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) { // Controller should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods. func TestDealsWithExistingPods(t *testing.T) { manager, podControl, _ := newTestController() - addNodes(manager.nodeStore, 0, 5, nil) - addPods(manager.podStore, "node-1", simpleDaemonSetLabel, 1) - addPods(manager.podStore, "node-2", simpleDaemonSetLabel, 2) - addPods(manager.podStore, "node-3", simpleDaemonSetLabel, 5) - addPods(manager.podStore, "node-4", simpleDaemonSetLabel2, 2) ds := newDaemonSet("foo") manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + addPods(manager.podStore, "node-1", simpleDaemonSetLabel, ds, 1) + addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 2) + addPods(manager.podStore, "node-3", simpleDaemonSetLabel, ds, 5) + addPods(manager.podStore, "node-4", simpleDaemonSetLabel2, ds, 2) syncAndValidateDaemonSets(t, manager, ds, podControl, 2, 5) } @@ -642,34 +647,34 @@ func TestSelectorDaemonLaunchesPods(t *testing.T) { // Daemon with node selector should delete pods from nodes that do not satisfy selector. func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) { manager, podControl, _ := newTestController() + ds := newDaemonSet("foo") + ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel + manager.dsStore.Add(ds) addNodes(manager.nodeStore, 0, 5, nil) addNodes(manager.nodeStore, 5, 5, simpleNodeLabel) - addPods(manager.podStore, "node-0", simpleDaemonSetLabel2, 2) - addPods(manager.podStore, "node-1", simpleDaemonSetLabel, 3) - addPods(manager.podStore, "node-1", simpleDaemonSetLabel2, 1) - addPods(manager.podStore, "node-4", simpleDaemonSetLabel, 1) - daemon := newDaemonSet("foo") - daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel - manager.dsStore.Add(daemon) - syncAndValidateDaemonSets(t, manager, daemon, podControl, 5, 4) + addPods(manager.podStore, "node-0", simpleDaemonSetLabel2, ds, 2) + addPods(manager.podStore, "node-1", simpleDaemonSetLabel, ds, 3) + addPods(manager.podStore, "node-1", simpleDaemonSetLabel2, ds, 1) + addPods(manager.podStore, "node-4", simpleDaemonSetLabel, ds, 1) + syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 4) } // DaemonSet with node selector should launch pods on nodes matching selector, but also deal with existing pods on nodes. func TestSelectorDaemonDealsWithExistingPods(t *testing.T) { manager, podControl, _ := newTestController() - addNodes(manager.nodeStore, 0, 5, nil) - addNodes(manager.nodeStore, 5, 5, simpleNodeLabel) - addPods(manager.podStore, "node-0", simpleDaemonSetLabel, 1) - addPods(manager.podStore, "node-1", simpleDaemonSetLabel, 3) - addPods(manager.podStore, "node-1", simpleDaemonSetLabel2, 2) - addPods(manager.podStore, "node-2", simpleDaemonSetLabel, 4) - addPods(manager.podStore, "node-6", simpleDaemonSetLabel, 13) - addPods(manager.podStore, "node-7", simpleDaemonSetLabel2, 4) - addPods(manager.podStore, "node-9", simpleDaemonSetLabel, 1) - addPods(manager.podStore, "node-9", simpleDaemonSetLabel2, 1) ds := newDaemonSet("foo") ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + addNodes(manager.nodeStore, 5, 5, simpleNodeLabel) + addPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, 1) + addPods(manager.podStore, "node-1", simpleDaemonSetLabel, ds, 3) + addPods(manager.podStore, "node-1", simpleDaemonSetLabel2, ds, 2) + addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 4) + addPods(manager.podStore, "node-6", simpleDaemonSetLabel, ds, 13) + addPods(manager.podStore, "node-7", simpleDaemonSetLabel2, ds, 4) + addPods(manager.podStore, "node-9", simpleDaemonSetLabel, ds, 1) + addPods(manager.podStore, "node-9", simpleDaemonSetLabel2, ds, 1) syncAndValidateDaemonSets(t, manager, ds, podControl, 3, 20) } @@ -756,7 +761,7 @@ func TestNodeAffinityDaemonLaunchesPods(t *testing.T) { } func TestNumberReadyStatus(t *testing.T) { - daemon := newDaemonSet("foo") + ds := newDaemonSet("foo") manager, podControl, clientset := newTestController() var updated *extensions.DaemonSet clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) { @@ -769,31 +774,31 @@ func TestNumberReadyStatus(t *testing.T) { return false, nil, nil }) addNodes(manager.nodeStore, 0, 2, simpleNodeLabel) - addPods(manager.podStore, "node-0", simpleDaemonSetLabel, 1) - addPods(manager.podStore, "node-1", simpleDaemonSetLabel, 1) - manager.dsStore.Add(daemon) + addPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, 1) + addPods(manager.podStore, "node-1", simpleDaemonSetLabel, ds, 1) + manager.dsStore.Add(ds) - syncAndValidateDaemonSets(t, manager, daemon, podControl, 0, 0) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) if updated.Status.NumberReady != 0 { t.Errorf("Wrong daemon %s status: %v", updated.Name, updated.Status) } - selector, _ := metav1.LabelSelectorAsSelector(daemon.Spec.Selector) - daemonPods, _ := manager.podLister.Pods(daemon.Namespace).List(selector) + selector, _ := metav1.LabelSelectorAsSelector(ds.Spec.Selector) + daemonPods, _ := manager.podLister.Pods(ds.Namespace).List(selector) for _, pod := range daemonPods { condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue} pod.Status.Conditions = append(pod.Status.Conditions, condition) } - syncAndValidateDaemonSets(t, manager, daemon, podControl, 0, 0) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) if updated.Status.NumberReady != 2 { t.Errorf("Wrong daemon %s status: %v", updated.Name, updated.Status) } } func TestObservedGeneration(t *testing.T) { - daemon := newDaemonSet("foo") - daemon.Generation = 1 + ds := newDaemonSet("foo") + ds.Generation = 1 manager, podControl, clientset := newTestController() var updated *extensions.DaemonSet clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) { @@ -807,12 +812,12 @@ func TestObservedGeneration(t *testing.T) { }) addNodes(manager.nodeStore, 0, 1, simpleNodeLabel) - addPods(manager.podStore, "node-0", simpleDaemonSetLabel, 1) - manager.dsStore.Add(daemon) + addPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, 1) + manager.dsStore.Add(ds) - syncAndValidateDaemonSets(t, manager, daemon, podControl, 0, 0) - if updated.Status.ObservedGeneration != daemon.Generation { - t.Errorf("Wrong ObservedGeneration for daemon %s in status. Expected %d, got %d", updated.Name, daemon.Generation, updated.Status.ObservedGeneration) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) + if updated.Status.ObservedGeneration != ds.Generation { + t.Errorf("Wrong ObservedGeneration for daemon %s in status. Expected %d, got %d", updated.Name, ds.Generation, updated.Status.ObservedGeneration) } } @@ -832,11 +837,11 @@ func TestDaemonKillFailedPods(t *testing.T) { for _, test := range tests { t.Logf("test case: %s\n", test.test) manager, podControl, _ := newTestController() - addNodes(manager.nodeStore, 0, 1, nil) - addFailedPods(manager.podStore, "node-0", simpleDaemonSetLabel, test.numFailedPods) - addPods(manager.podStore, "node-0", simpleDaemonSetLabel, test.numNormalPods) ds := newDaemonSet("foo") manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 1, nil) + addFailedPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, test.numFailedPods) + addPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, test.numNormalPods) syncAndValidateDaemonSets(t, manager, ds, podControl, test.expectedCreates, test.expectedDeletes) } } @@ -1183,3 +1188,59 @@ func TestUpdateNode(t *testing.T) { } } } + +func TestGetNodesToDaemonPods(t *testing.T) { + manager, _, _ := newTestController() + ds := newDaemonSet("foo") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds) + manager.dsStore.Add(ds2) + addNodes(manager.nodeStore, 0, 2, nil) + + // These pods should be returned. + wantedPods := []*v1.Pod{ + newPod("matching-owned-0-", "node-0", simpleDaemonSetLabel, ds), + newPod("matching-orphan-0-", "node-0", simpleDaemonSetLabel, nil), + newPod("matching-owned-1-", "node-1", simpleDaemonSetLabel, ds), + newPod("matching-orphan-1-", "node-1", simpleDaemonSetLabel, nil), + } + failedPod := newPod("matching-owned-failed-pod-1-", "node-1", simpleDaemonSetLabel, ds) + failedPod.Status = v1.PodStatus{Phase: v1.PodFailed} + wantedPods = append(wantedPods, failedPod) + for _, pod := range wantedPods { + manager.podStore.Add(pod) + } + + // These pods should be ignored. + ignoredPods := []*v1.Pod{ + newPod("non-matching-owned-0-", "node-0", simpleDaemonSetLabel2, ds), + newPod("non-matching-orphan-1-", "node-1", simpleDaemonSetLabel2, nil), + newPod("matching-owned-by-other-0-", "node-0", simpleDaemonSetLabel, ds2), + } + for _, pod := range ignoredPods { + manager.podStore.Add(pod) + } + + nodesToDaemonPods, err := manager.getNodesToDaemonPods(ds) + if err != nil { + t.Fatalf("getNodesToDaemonPods() error: %v", err) + } + gotPods := map[string]bool{} + for node, pods := range nodesToDaemonPods { + for _, pod := range pods { + if pod.Spec.NodeName != node { + t.Errorf("pod %v grouped into %v but belongs in %v", pod.Name, node, pod.Spec.NodeName) + } + gotPods[pod.Name] = true + } + } + for _, pod := range wantedPods { + if !gotPods[pod.Name] { + t.Errorf("expected pod %v but didn't get it", pod.Name) + } + delete(gotPods, pod.Name) + } + for podName := range gotPods { + t.Errorf("unexpected pod %v was returned", podName) + } +} diff --git a/pkg/controller/daemon/update.go b/pkg/controller/daemon/update.go index f13afba8330..44895a54a08 100644 --- a/pkg/controller/daemon/update.go +++ b/pkg/controller/daemon/update.go @@ -31,11 +31,9 @@ import ( // rollingUpdate deletes old daemon set pods making sure that no more than // ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable -func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet) error { - newPods, oldPods, err := dsc.getAllDaemonSetPods(ds) - allPods := append(oldPods, newPods...) - - maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, allPods) +func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) error { + _, oldPods, err := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods) + maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeToDaemonPods) if err != nil { return fmt.Errorf("Couldn't get unavailable numbers: %v", err) } @@ -67,29 +65,23 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet) error { return utilerrors.NewAggregate(errors) } -func (dsc *DaemonSetsController) getAllDaemonSetPods(ds *extensions.DaemonSet) ([]*v1.Pod, []*v1.Pod, error) { +func (dsc *DaemonSetsController) getAllDaemonSetPods(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) ([]*v1.Pod, []*v1.Pod, error) { var newPods []*v1.Pod var oldPods []*v1.Pod - selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector) - if err != nil { - return newPods, oldPods, err - } - daemonPods, err := dsc.podLister.Pods(ds.Namespace).List(selector) - if err != nil { - return newPods, oldPods, fmt.Errorf("Couldn't get list of pods for daemon set %#v: %v", ds, err) - } - for _, pod := range daemonPods { - if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod) { - newPods = append(newPods, pod) - } else { - oldPods = append(oldPods, pod) + for _, pods := range nodeToDaemonPods { + for _, pod := range pods { + if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod) { + newPods = append(newPods, pod) + } else { + oldPods = append(oldPods, pod) + } } } return newPods, oldPods, nil } -func (dsc *DaemonSetsController) getUnavailableNumbers(ds *extensions.DaemonSet, allPods []*v1.Pod) (int, int, error) { +func (dsc *DaemonSetsController) getUnavailableNumbers(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) (int, int, error) { glog.V(4).Infof("Getting unavailable numbers") // TODO: get nodeList once in syncDaemonSet and pass it to other functions nodeList, err := dsc.nodeLister.List(labels.Everything()) @@ -97,11 +89,6 @@ func (dsc *DaemonSetsController) getUnavailableNumbers(ds *extensions.DaemonSet, return -1, -1, fmt.Errorf("couldn't get list of nodes during rolling update of daemon set %#v: %v", ds, err) } - nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) - if err != nil { - return -1, -1, fmt.Errorf("couldn't get node to daemon pods mapping for daemon set %#v: %v", ds, err) - } - var numUnavailable, desiredNumberScheduled int for i := range nodeList { node := nodeList[i] From 10998118335e176519fb0672a63fb49901792038 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Sat, 25 Feb 2017 17:34:14 -0800 Subject: [PATCH 04/10] DaemonSet: Use ControllerRef to route watch events. This is part of the completion of ControllerRef, as described here: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/controller-ref.md#watches --- cmd/kube-controller-manager/app/extensions.go | 1 - .../app/options/options.go | 2 +- pkg/controller/daemon/BUILD | 1 + pkg/controller/daemon/daemoncontroller.go | 192 +++++++------ .../daemon/daemoncontroller_test.go | 261 +++++++++++++++++- 5 files changed, 348 insertions(+), 109 deletions(-) diff --git a/cmd/kube-controller-manager/app/extensions.go b/cmd/kube-controller-manager/app/extensions.go index f349ecbe212..a3218496fa7 100644 --- a/cmd/kube-controller-manager/app/extensions.go +++ b/cmd/kube-controller-manager/app/extensions.go @@ -36,7 +36,6 @@ func startDaemonSetController(ctx ControllerContext) (bool, error) { ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Nodes(), ctx.ClientBuilder.ClientOrDie("daemon-set-controller"), - int(ctx.Options.LookupCacheSizeForDaemonSet), ).Run(int(ctx.Options.ConcurrentDaemonSetSyncs), ctx.Stop) return true, nil } diff --git a/cmd/kube-controller-manager/app/options/options.go b/cmd/kube-controller-manager/app/options/options.go index 196ba2742bd..4738d56b3af 100644 --- a/cmd/kube-controller-manager/app/options/options.go +++ b/cmd/kube-controller-manager/app/options/options.go @@ -134,7 +134,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet, allControllers []string, disabled fs.Int32Var(&s.ConcurrentSATokenSyncs, "concurrent-serviceaccount-token-syncs", s.ConcurrentSATokenSyncs, "The number of service account token objects that are allowed to sync concurrently. Larger number = more responsive token generation, but more CPU (and network) load") fs.Int32Var(&s.LookupCacheSizeForRC, "replication-controller-lookup-cache-size", s.LookupCacheSizeForRC, "This flag is deprecated and will be removed in future releases. ReplicationController no longer requires a lookup cache.") fs.Int32Var(&s.LookupCacheSizeForRS, "replicaset-lookup-cache-size", s.LookupCacheSizeForRS, "This flag is deprecated and will be removed in future releases. ReplicaSet no longer requires a lookup cache.") - fs.Int32Var(&s.LookupCacheSizeForDaemonSet, "daemonset-lookup-cache-size", s.LookupCacheSizeForDaemonSet, "The the size of lookup cache for daemonsets. Larger number = more responsive daemonsets, but more MEM load.") + fs.Int32Var(&s.LookupCacheSizeForDaemonSet, "daemonset-lookup-cache-size", s.LookupCacheSizeForDaemonSet, "This flag is deprecated and will be removed in future releases. DaemonSet no longer requires a lookup cache.") fs.DurationVar(&s.ServiceSyncPeriod.Duration, "service-sync-period", s.ServiceSyncPeriod.Duration, "The period for syncing services with their external load balancers") fs.DurationVar(&s.NodeSyncPeriod.Duration, "node-sync-period", 0, ""+ "This flag is deprecated and will be removed in future releases. See node-monitor-period for Node health checking or "+ diff --git a/pkg/controller/daemon/BUILD b/pkg/controller/daemon/BUILD index 3ca29d93d4d..d3ed2e45f51 100644 --- a/pkg/controller/daemon/BUILD +++ b/pkg/controller/daemon/BUILD @@ -79,6 +79,7 @@ go_test( "//vendor:k8s.io/client-go/testing", "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/record", + "//vendor:k8s.io/client-go/util/workqueue", ], ) diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index 32eb8778758..d850834911c 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -108,13 +108,11 @@ type DaemonSetsController struct { // Added as a member to the struct to allow injection for testing. nodeStoreSynced cache.InformerSynced - lookupCache *controller.MatchingCache - // DaemonSet keys that need to be synced. queue workqueue.RateLimitingInterface } -func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface, lookupCacheSize int) *DaemonSetsController { +func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) *DaemonSetsController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. @@ -144,21 +142,6 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo UpdateFunc: func(old, cur interface{}) { oldDS := old.(*extensions.DaemonSet) curDS := cur.(*extensions.DaemonSet) - // We should invalidate the whole lookup cache if a DS's selector has been updated. - // - // Imagine that you have two RSs: - // * old DS1 - // * new DS2 - // You also have a pod that is attached to DS2 (because it doesn't match DS1 selector). - // Now imagine that you are changing DS1 selector so that it is now matching that pod, - // in such case we must invalidate the whole cache so that pod could be adopted by DS1 - // - // This makes the lookup cache less helpful, but selector update does not happen often, - // so it's not a big problem - if !reflect.DeepEqual(oldDS.Spec.Selector, curDS.Spec.Selector) { - dsc.lookupCache.InvalidateAll() - } - glog.V(4).Infof("Updating daemon set %s", oldDS.Name) dsc.enqueueDaemonSet(curDS) }, @@ -187,7 +170,6 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo dsc.syncHandler = dsc.syncDaemonSet dsc.enqueueDaemonSet = dsc.enqueue - dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize) return dsc } @@ -276,78 +258,57 @@ func (dsc *DaemonSetsController) enqueueDaemonSetAfter(obj interface{}, after ti dsc.queue.AddAfter(key, after) } -func (dsc *DaemonSetsController) getPodDaemonSet(pod *v1.Pod) *extensions.DaemonSet { - // look up in the cache, if cached and the cache is valid, just return cached value - if obj, cached := dsc.lookupCache.GetMatchingObject(pod); cached { - ds, ok := obj.(*extensions.DaemonSet) - if !ok { - // This should not happen - utilruntime.HandleError(fmt.Errorf("lookup cache does not return a DaemonSet object")) - return nil - } - if dsc.isCacheValid(pod, ds) { - return ds - } - } +// getPodDaemonSets returns a list of DaemonSets that potentially match the pod. +func (dsc *DaemonSetsController) getPodDaemonSets(pod *v1.Pod) []*extensions.DaemonSet { sets, err := dsc.dsLister.GetPodDaemonSets(pod) if err != nil { glog.V(4).Infof("No daemon sets found for pod %v, daemon set controller will avoid syncing", pod.Name) return nil } if len(sets) > 1 { - // More than two items in this list indicates user error. If two daemon - // sets overlap, sort by creation timestamp, subsort by name, then pick - // the first. + // ControllerRef will ensure we don't do anythign crazy, but more than one + // item in this list nevertheless constitutes user error. utilruntime.HandleError(fmt.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels)) - sort.Sort(byCreationTimestamp(sets)) } - - // update lookup cache - dsc.lookupCache.Update(pod, sets[0]) - - return sets[0] -} - -// isCacheValid check if the cache is valid -func (dsc *DaemonSetsController) isCacheValid(pod *v1.Pod, cachedDS *extensions.DaemonSet) bool { - _, err := dsc.dsLister.DaemonSets(cachedDS.Namespace).Get(cachedDS.Name) - // ds has been deleted or updated, cache is invalid - if err != nil || !isDaemonSetMatch(pod, cachedDS) { - return false - } - return true -} - -// isDaemonSetMatch take a Pod and DaemonSet, return whether the Pod and DaemonSet are matching -// TODO(mqliang): This logic is a copy from GetPodDaemonSets(), remove the duplication -func isDaemonSetMatch(pod *v1.Pod, ds *extensions.DaemonSet) bool { - if ds.Namespace != pod.Namespace { - return false - } - selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector) - if err != nil { - err = fmt.Errorf("invalid selector: %v", err) - return false - } - - // If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything. - if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { - return false - } - return true + return sets } func (dsc *DaemonSetsController) addPod(obj interface{}) { pod := obj.(*v1.Pod) glog.V(4).Infof("Pod %s added.", pod.Name) - if ds := dsc.getPodDaemonSet(pod); ds != nil { + + if pod.DeletionTimestamp != nil { + // on a restart of the controller manager, it's possible a new pod shows up in a state that + // is already pending deletion. Prevent the pod from being a creation observation. + dsc.deletePod(pod) + return + } + + // If it has a ControllerRef, that's all that matters. + if controllerRef := controller.GetControllerOf(pod); controllerRef != nil { + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + ds, err := dsc.dsLister.DaemonSets(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } dsKey, err := controller.KeyFunc(ds) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err)) return } dsc.expectations.CreationObserved(dsKey) dsc.enqueueDaemonSet(ds) + return + } + + // Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync + // them to see if anyone wants to adopt it. + // DO NOT observe creation because no controller should be waiting for an + // orphan. + for _, ds := range dsc.getPodDaemonSets(pod) { + dsc.enqueueDaemonSet(ds) } } @@ -364,22 +325,43 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) { } glog.V(4).Infof("Pod %s updated.", curPod.Name) changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) - if curDS := dsc.getPodDaemonSet(curPod); curDS != nil { - dsc.enqueueDaemonSet(curDS) + labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) - // See https://github.com/kubernetes/kubernetes/pull/38076 for more details - if changedToReady && curDS.Spec.MinReadySeconds > 0 { - dsc.enqueueDaemonSetAfter(curDS, time.Duration(curDS.Spec.MinReadySeconds)*time.Second) + curControllerRef := controller.GetControllerOf(curPod) + oldControllerRef := controller.GetControllerOf(oldPod) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && + oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind { + // The ControllerRef was changed. Sync the old controller, if any. + ds, err := dsc.dsLister.DaemonSets(oldPod.Namespace).Get(oldControllerRef.Name) + if err == nil { + dsc.enqueueDaemonSet(ds) } } - // If the labels have not changed, then the daemon set responsible for - // the pod is the same as it was before. In that case we have enqueued the daemon - // set above, and do not have to enqueue the set again. - if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { - // It's ok if both oldDS and curDS are the same, because curDS will set - // the expectations on its run so oldDS will have no effect. - if oldDS := dsc.getPodDaemonSet(oldPod); oldDS != nil { - dsc.enqueueDaemonSet(oldDS) + + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + if curControllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + ds, err := dsc.dsLister.DaemonSets(curPod.Namespace).Get(curControllerRef.Name) + if err != nil { + return + } + dsc.enqueueDaemonSet(ds) + // See https://github.com/kubernetes/kubernetes/pull/38076 for more details + if changedToReady && ds.Spec.MinReadySeconds > 0 { + dsc.enqueueDaemonSetAfter(ds, time.Duration(ds.Spec.MinReadySeconds)*time.Second) + } + return + } + + // Otherwise, it's an orphan. If anything changed, sync matching controllers + // to see if anyone wants to adopt it now. + if labelChanged || controllerRefChanged { + for _, ds := range dsc.getPodDaemonSets(curPod) { + dsc.enqueueDaemonSet(ds) } } } @@ -394,25 +376,37 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { - utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj)) + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj)) return } } glog.V(4).Infof("Pod %s deleted.", pod.Name) - if ds := dsc.getPodDaemonSet(pod); ds != nil { - dsKey, err := controller.KeyFunc(ds) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err)) - return - } - dsc.expectations.DeletionObserved(dsKey) - dsc.enqueueDaemonSet(ds) + + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return } + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + + ds, err := dsc.dsLister.DaemonSets(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } + dsKey, err := controller.KeyFunc(ds) + if err != nil { + return + } + dsc.expectations.DeletionObserved(dsKey) + dsc.enqueueDaemonSet(ds) } func (dsc *DaemonSetsController) addNode(obj interface{}) { @@ -481,7 +475,7 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) return nil, err } // Use ControllerRefManager to adopt/orphan as needed. - cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, ControllerKind) + cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind) claimedPods, err := cm.ClaimPods(pods) if err != nil { return nil, err @@ -867,7 +861,7 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *exten } // ignore pods that belong to the daemonset when taking into account whether // a daemonset should bind to a node. - if pds := dsc.getPodDaemonSet(pod); pds != nil && ds.Name == pds.Name { + if controllerRef := controller.GetControllerOf(pod); controllerRef != nil && controllerRef.UID == ds.UID { continue } pods = append(pods, pod) @@ -975,8 +969,8 @@ func Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorit func newControllerRef(ds *extensions.DaemonSet) *metav1.OwnerReference { isController := true return &metav1.OwnerReference{ - APIVersion: ControllerKind.GroupVersion().String(), - Kind: ControllerKind.Kind, + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, Name: ds.Name, UID: ds.UID, Controller: &isController, diff --git a/pkg/controller/daemon/daemoncontroller_test.go b/pkg/controller/daemon/daemoncontroller_test.go index ea62750ae50..b41ba545840 100644 --- a/pkg/controller/daemon/daemoncontroller_test.go +++ b/pkg/controller/daemon/daemoncontroller_test.go @@ -18,6 +18,9 @@ package daemon import ( "fmt" + "reflect" + "sort" + "strconv" "sync" "testing" @@ -30,6 +33,7 @@ import ( core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/v1" @@ -248,7 +252,6 @@ func newTestController(initialObjects ...runtime.Object) (*daemonSetsController, informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Nodes(), clientset, - 0, ) manager.eventRecorder = record.NewFakeRecorder(100) @@ -542,16 +545,17 @@ func TestPortConflictWithSameDaemonPodDoesNotDeletePod(t *testing.T) { manager, podControl, _ := newTestController() node := newNode("port-conflict", nil) manager.nodeStore.Add(node) - manager.podStore.Add(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: simpleDaemonSetLabel, - Namespace: metav1.NamespaceDefault, - }, - Spec: podSpec, - }) ds := newDaemonSet("foo") ds.Spec.Template.Spec = podSpec manager.dsStore.Add(ds) + manager.podStore.Add(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: simpleDaemonSetLabel, + Namespace: metav1.NamespaceDefault, + OwnerReferences: []metav1.OwnerReference{*newControllerRef(ds)}, + }, + Spec: podSpec, + }) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) } @@ -1244,3 +1248,244 @@ func TestGetNodesToDaemonPods(t *testing.T) { t.Errorf("unexpected pod %v was returned", podName) } } + +func TestAddPod(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1) + manager.addPod(pod1) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(ds1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2) + manager.addPod(pod2) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(ds2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestAddPodOrphan(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + ds3 := newDaemonSet("foo3") + ds3.Spec.Selector.MatchLabels = simpleDaemonSetLabel2 + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + manager.dsStore.Add(ds3) + + // Make pod an orphan. Expect matching sets to be queued. + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil) + manager.addPod(pod) + if got, want := manager.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + if got, want := getQueuedKeys(manager.queue), []string{"default/foo1", "default/foo2"}; !reflect.DeepEqual(got, want) { + t.Errorf("getQueuedKeys() = %v, want %v", got, want) + } +} + +func TestUpdatePod(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1) + prev := *pod1 + bumpResourceVersion(pod1) + manager.updatePod(&prev, pod1) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(ds1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2) + prev = *pod2 + bumpResourceVersion(pod2) + manager.updatePod(&prev, pod2) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(ds2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestUpdatePodOrphanSameLabels(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil) + prev := *pod + bumpResourceVersion(pod) + manager.updatePod(&prev, pod) + if got, want := manager.queue.Len(), 0; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestUpdatePodOrphanWithNewLabels(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil) + prev := *pod + prev.Labels = map[string]string{"foo2": "bar2"} + bumpResourceVersion(pod) + manager.updatePod(&prev, pod) + if got, want := manager.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + if got, want := getQueuedKeys(manager.queue), []string{"default/foo1", "default/foo2"}; !reflect.DeepEqual(got, want) { + t.Errorf("getQueuedKeys() = %v, want %v", got, want) + } +} + +func TestUpdatePodChangeControllerRef(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1) + prev := *pod + prev.OwnerReferences = []metav1.OwnerReference{*newControllerRef(ds2)} + bumpResourceVersion(pod) + manager.updatePod(&prev, pod) + if got, want := manager.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestUpdatePodControllerRefRemoved(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1) + prev := *pod + pod.OwnerReferences = nil + bumpResourceVersion(pod) + manager.updatePod(&prev, pod) + if got, want := manager.queue.Len(), 2; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func TestDeletePod(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + + pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1) + manager.deletePod(pod1) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done := manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) + } + expectedKey, _ := controller.KeyFunc(ds1) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } + + pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2) + manager.deletePod(pod2) + if got, want := manager.queue.Len(), 1; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } + key, done = manager.queue.Get() + if key == nil || done { + t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) + } + expectedKey, _ = controller.KeyFunc(ds2) + if got, want := key.(string), expectedKey; got != want { + t.Errorf("queue.Get() = %v, want %v", got, want) + } +} + +func TestDeletePodOrphan(t *testing.T) { + manager, _, _ := newTestController() + ds1 := newDaemonSet("foo1") + ds2 := newDaemonSet("foo2") + ds3 := newDaemonSet("foo3") + ds3.Spec.Selector.MatchLabels = simpleDaemonSetLabel2 + manager.dsStore.Add(ds1) + manager.dsStore.Add(ds2) + manager.dsStore.Add(ds3) + + pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil) + manager.deletePod(pod) + if got, want := manager.queue.Len(), 0; got != want { + t.Fatalf("queue.Len() = %v, want %v", got, want) + } +} + +func bumpResourceVersion(obj metav1.Object) { + ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32) + obj.SetResourceVersion(strconv.FormatInt(ver+1, 10)) +} + +// getQueuedKeys returns a sorted list of keys in the queue. +// It can be used to quickly check that multiple keys are in there. +func getQueuedKeys(queue workqueue.RateLimitingInterface) []string { + var keys []string + count := queue.Len() + for i := 0; i < count; i++ { + key, done := queue.Get() + if done { + return keys + } + keys = append(keys, key.(string)) + } + sort.Strings(keys) + return keys +} From c288f52d2f3a05c305c9e5fb54ca01374b84ffa3 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Sat, 25 Feb 2017 17:34:50 -0800 Subject: [PATCH 05/10] DaemonSet: Update Lister documentation for ControllerRef. The DaemonSet Listers still use selectors, because this is the behavior expected by callers. This clarifies the meaning of the returned list. Some callers may need to switch to using GetControllerOf() instead, but that is a separate, case-by-case issue. --- .../extensions/internalversion/daemonset_expansion.go | 5 +++-- pkg/client/listers/extensions/v1beta1/daemonset_expansion.go | 5 +++-- .../listers/extensions/v1beta1/daemonset_expansion.go | 5 +++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/client/listers/extensions/internalversion/daemonset_expansion.go b/pkg/client/listers/extensions/internalversion/daemonset_expansion.go index c5073f3bca6..15c7cc7ea89 100644 --- a/pkg/client/listers/extensions/internalversion/daemonset_expansion.go +++ b/pkg/client/listers/extensions/internalversion/daemonset_expansion.go @@ -35,8 +35,9 @@ type DaemonSetListerExpansion interface { // DaemonSetNamespaeLister. type DaemonSetNamespaceListerExpansion interface{} -// GetPodDaemonSets returns a list of daemon sets managing a pod. -// Returns an error if and only if no matching daemon sets are found. +// GetPodDaemonSets returns a list of DaemonSets that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching DaemonSets are found. func (s *daemonSetLister) GetPodDaemonSets(pod *api.Pod) ([]*extensions.DaemonSet, error) { var selector labels.Selector var daemonSet *extensions.DaemonSet diff --git a/pkg/client/listers/extensions/v1beta1/daemonset_expansion.go b/pkg/client/listers/extensions/v1beta1/daemonset_expansion.go index d3497414e99..b2144df4afc 100644 --- a/pkg/client/listers/extensions/v1beta1/daemonset_expansion.go +++ b/pkg/client/listers/extensions/v1beta1/daemonset_expansion.go @@ -35,8 +35,9 @@ type DaemonSetListerExpansion interface { // DaemonSetNamespaeLister. type DaemonSetNamespaceListerExpansion interface{} -// GetPodDaemonSets returns a list of daemon sets managing a pod. -// Returns an error if and only if no matching daemon sets are found. +// GetPodDaemonSets returns a list of DaemonSets that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching DaemonSets are found. func (s *daemonSetLister) GetPodDaemonSets(pod *v1.Pod) ([]*v1beta1.DaemonSet, error) { var selector labels.Selector var daemonSet *v1beta1.DaemonSet diff --git a/staging/src/k8s.io/client-go/listers/extensions/v1beta1/daemonset_expansion.go b/staging/src/k8s.io/client-go/listers/extensions/v1beta1/daemonset_expansion.go index dc17c105cf8..019cdc40a8a 100644 --- a/staging/src/k8s.io/client-go/listers/extensions/v1beta1/daemonset_expansion.go +++ b/staging/src/k8s.io/client-go/listers/extensions/v1beta1/daemonset_expansion.go @@ -35,8 +35,9 @@ type DaemonSetListerExpansion interface { // DaemonSetNamespaeLister. type DaemonSetNamespaceListerExpansion interface{} -// GetPodDaemonSets returns a list of daemon sets managing a pod. -// Returns an error if and only if no matching daemon sets are found. +// GetPodDaemonSets returns a list of DaemonSets that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching DaemonSets are found. func (s *daemonSetLister) GetPodDaemonSets(pod *v1.Pod) ([]*v1beta1.DaemonSet, error) { var selector labels.Selector var daemonSet *v1beta1.DaemonSet From ab5a82d6e6fd9afc322b82ae448b6930c98d540b Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Thu, 2 Mar 2017 10:35:21 -0800 Subject: [PATCH 06/10] DaemonSet: Don't log Pod events unless some DaemonSet cares. --- pkg/controller/daemon/daemoncontroller.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index d850834911c..9a573c229be 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -262,7 +262,6 @@ func (dsc *DaemonSetsController) enqueueDaemonSetAfter(obj interface{}, after ti func (dsc *DaemonSetsController) getPodDaemonSets(pod *v1.Pod) []*extensions.DaemonSet { sets, err := dsc.dsLister.GetPodDaemonSets(pod) if err != nil { - glog.V(4).Infof("No daemon sets found for pod %v, daemon set controller will avoid syncing", pod.Name) return nil } if len(sets) > 1 { @@ -275,7 +274,6 @@ func (dsc *DaemonSetsController) getPodDaemonSets(pod *v1.Pod) []*extensions.Dae func (dsc *DaemonSetsController) addPod(obj interface{}) { pod := obj.(*v1.Pod) - glog.V(4).Infof("Pod %s added.", pod.Name) if pod.DeletionTimestamp != nil { // on a restart of the controller manager, it's possible a new pod shows up in a state that @@ -290,6 +288,7 @@ func (dsc *DaemonSetsController) addPod(obj interface{}) { // It's controlled by a different type of controller. return } + glog.V(4).Infof("Pod %s added.", pod.Name) ds, err := dsc.dsLister.DaemonSets(pod.Namespace).Get(controllerRef.Name) if err != nil { return @@ -307,7 +306,12 @@ func (dsc *DaemonSetsController) addPod(obj interface{}) { // them to see if anyone wants to adopt it. // DO NOT observe creation because no controller should be waiting for an // orphan. - for _, ds := range dsc.getPodDaemonSets(pod) { + dss := dsc.getPodDaemonSets(pod) + if len(dss) == 0 { + return + } + glog.V(4).Infof("Orphan Pod %s added.", pod.Name) + for _, ds := range dss { dsc.enqueueDaemonSet(ds) } } @@ -323,7 +327,6 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) { // Two different versions of the same pod will always have different RVs. return } - glog.V(4).Infof("Pod %s updated.", curPod.Name) changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) @@ -345,6 +348,7 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) { // It's controlled by a different type of controller. return } + glog.V(4).Infof("Pod %s updated.", curPod.Name) ds, err := dsc.dsLister.DaemonSets(curPod.Namespace).Get(curControllerRef.Name) if err != nil { return @@ -359,8 +363,13 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) { // Otherwise, it's an orphan. If anything changed, sync matching controllers // to see if anyone wants to adopt it now. + dss := dsc.getPodDaemonSets(curPod) + if len(dss) == 0 { + return + } + glog.V(4).Infof("Orphan Pod %s updated.", curPod.Name) if labelChanged || controllerRefChanged { - for _, ds := range dsc.getPodDaemonSets(curPod) { + for _, ds := range dss { dsc.enqueueDaemonSet(ds) } } @@ -385,7 +394,6 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) { return } } - glog.V(4).Infof("Pod %s deleted.", pod.Name) controllerRef := controller.GetControllerOf(pod) if controllerRef == nil { @@ -396,6 +404,7 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) { // It's controlled by a different type of controller. return } + glog.V(4).Infof("Pod %s deleted.", pod.Name) ds, err := dsc.dsLister.DaemonSets(pod.Namespace).Get(controllerRef.Name) if err != nil { From 97c363a3e084b0d66bda638d3a645ddf80e12d1c Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Thu, 2 Mar 2017 10:39:52 -0800 Subject: [PATCH 07/10] DaemonSet: Always set BlockOwnerDeletion in ControllerRef. --- pkg/controller/daemon/daemoncontroller.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index 9a573c229be..f1a1f8f2347 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -976,13 +976,15 @@ func Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorit // newControllerRef creates a ControllerRef pointing to the given DaemonSet. func newControllerRef(ds *extensions.DaemonSet) *metav1.OwnerReference { + blockOwnerDeletion := true isController := true return &metav1.OwnerReference{ - APIVersion: controllerKind.GroupVersion().String(), - Kind: controllerKind.Kind, - Name: ds.Name, - UID: ds.UID, - Controller: &isController, + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, + Name: ds.Name, + UID: ds.UID, + BlockOwnerDeletion: &blockOwnerDeletion, + Controller: &isController, } } From 182753f841901e458108e4dee9ded248768bb536 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Mon, 6 Mar 2017 10:48:10 -0800 Subject: [PATCH 08/10] DaemonSet: Check that ControllerRef UID matches. --- pkg/controller/daemon/daemoncontroller.go | 56 +++++++++++++---------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index f1a1f8f2347..419ecdbcd6f 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -284,19 +284,15 @@ func (dsc *DaemonSetsController) addPod(obj interface{}) { // If it has a ControllerRef, that's all that matters. if controllerRef := controller.GetControllerOf(pod); controllerRef != nil { - if controllerRef.Kind != controllerKind.Kind { - // It's controlled by a different type of controller. - return - } - glog.V(4).Infof("Pod %s added.", pod.Name) - ds, err := dsc.dsLister.DaemonSets(pod.Namespace).Get(controllerRef.Name) - if err != nil { + ds := dsc.resolveControllerRef(pod.Namespace, controllerRef) + if ds == nil { return } dsKey, err := controller.KeyFunc(ds) if err != nil { return } + glog.V(4).Infof("Pod %s added.", pod.Name) dsc.expectations.CreationObserved(dsKey) dsc.enqueueDaemonSet(ds) return @@ -333,26 +329,20 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) { curControllerRef := controller.GetControllerOf(curPod) oldControllerRef := controller.GetControllerOf(oldPod) controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) - if controllerRefChanged && - oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind { + if controllerRefChanged && oldControllerRef != nil { // The ControllerRef was changed. Sync the old controller, if any. - ds, err := dsc.dsLister.DaemonSets(oldPod.Namespace).Get(oldControllerRef.Name) - if err == nil { + if ds := dsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); ds != nil { dsc.enqueueDaemonSet(ds) } } // If it has a ControllerRef, that's all that matters. if curControllerRef != nil { - if curControllerRef.Kind != controllerKind.Kind { - // It's controlled by a different type of controller. + ds := dsc.resolveControllerRef(curPod.Namespace, curControllerRef) + if ds == nil { return } glog.V(4).Infof("Pod %s updated.", curPod.Name) - ds, err := dsc.dsLister.DaemonSets(curPod.Namespace).Get(curControllerRef.Name) - if err != nil { - return - } dsc.enqueueDaemonSet(ds) // See https://github.com/kubernetes/kubernetes/pull/38076 for more details if changedToReady && ds.Spec.MinReadySeconds > 0 { @@ -400,20 +390,15 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) { // No controller should care about orphans being deleted. return } - if controllerRef.Kind != controllerKind.Kind { - // It's controlled by a different type of controller. - return - } - glog.V(4).Infof("Pod %s deleted.", pod.Name) - - ds, err := dsc.dsLister.DaemonSets(pod.Namespace).Get(controllerRef.Name) - if err != nil { + ds := dsc.resolveControllerRef(pod.Namespace, controllerRef) + if ds == nil { return } dsKey, err := controller.KeyFunc(ds) if err != nil { return } + glog.V(4).Infof("Pod %s deleted.", pod.Name) dsc.expectations.DeletionObserved(dsKey) dsc.enqueueDaemonSet(ds) } @@ -498,6 +483,27 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) return nodeToDaemonPods, nil } +// resolveControllerRef returns the controller referenced by a ControllerRef, +// or nil if the ControllerRef could not be resolved to a matching controller +// of the corrrect Kind. +func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *extensions.DaemonSet { + // We can't look up by UID, so look up by Name and then verify UID. + // Don't even try to look up by Name if it's the wrong Kind. + if controllerRef.Kind != controllerKind.Kind { + return nil + } + ds, err := dsc.dsLister.DaemonSets(namespace).Get(controllerRef.Name) + if err != nil { + return nil + } + if ds.UID != controllerRef.UID { + // The controller we found with this Name is not the same one that the + // ControllerRef points to. + return nil + } + return ds +} + func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) error { // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node. From e2deb1795d12d48a39b1514838672402ba1ab5a1 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Tue, 7 Mar 2017 10:02:38 -0800 Subject: [PATCH 09/10] DaemonSet: Mark daemonset-lookup-cache-size flag as deprecated. --- cmd/kube-controller-manager/app/options/options.go | 2 ++ pkg/apis/componentconfig/types.go | 1 + 2 files changed, 3 insertions(+) diff --git a/cmd/kube-controller-manager/app/options/options.go b/cmd/kube-controller-manager/app/options/options.go index 4738d56b3af..126d1d7af90 100644 --- a/cmd/kube-controller-manager/app/options/options.go +++ b/cmd/kube-controller-manager/app/options/options.go @@ -134,7 +134,9 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet, allControllers []string, disabled fs.Int32Var(&s.ConcurrentSATokenSyncs, "concurrent-serviceaccount-token-syncs", s.ConcurrentSATokenSyncs, "The number of service account token objects that are allowed to sync concurrently. Larger number = more responsive token generation, but more CPU (and network) load") fs.Int32Var(&s.LookupCacheSizeForRC, "replication-controller-lookup-cache-size", s.LookupCacheSizeForRC, "This flag is deprecated and will be removed in future releases. ReplicationController no longer requires a lookup cache.") fs.Int32Var(&s.LookupCacheSizeForRS, "replicaset-lookup-cache-size", s.LookupCacheSizeForRS, "This flag is deprecated and will be removed in future releases. ReplicaSet no longer requires a lookup cache.") + // TODO: Remove the following flag 6 months after v1.6.0 is released. fs.Int32Var(&s.LookupCacheSizeForDaemonSet, "daemonset-lookup-cache-size", s.LookupCacheSizeForDaemonSet, "This flag is deprecated and will be removed in future releases. DaemonSet no longer requires a lookup cache.") + fs.MarkDeprecated("daemonset-lookup-cache-size", "This flag is deprecated and will be removed in future releases. DaemonSet no longer requires a lookup cache.") fs.DurationVar(&s.ServiceSyncPeriod.Duration, "service-sync-period", s.ServiceSyncPeriod.Duration, "The period for syncing services with their external load balancers") fs.DurationVar(&s.NodeSyncPeriod.Duration, "node-sync-period", 0, ""+ "This flag is deprecated and will be removed in future releases. See node-monitor-period for Node health checking or "+ diff --git a/pkg/apis/componentconfig/types.go b/pkg/apis/componentconfig/types.go index af3812e2c47..285044d28b9 100644 --- a/pkg/apis/componentconfig/types.go +++ b/pkg/apis/componentconfig/types.go @@ -688,6 +688,7 @@ type KubeControllerManagerConfiguration struct { LookupCacheSizeForRS int32 // lookupCacheSizeForDaemonSet is the size of lookup cache for daemonsets. // Larger number = more responsive daemonset, but more MEM load. + // DEPRECATED: This is no longer used. LookupCacheSizeForDaemonSet int32 // serviceSyncPeriod is the period for syncing services with their external // load balancers. From fac372d0905defdd32e25da1bdeb446dce15bf9c Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Tue, 7 Mar 2017 15:01:11 -0800 Subject: [PATCH 10/10] DaemonSet: Relist Pods before each phase of sync. The design of DaemonSet requires a relist before each phase (manage, update, status) because it does not short-circuit and requeue for each action triggered. --- pkg/controller/daemon/daemoncontroller.go | 40 +++++++++++-------- .../daemon/daemoncontroller_test.go | 26 ++++++++++++ pkg/controller/daemon/update.go | 7 +++- 3 files changed, 55 insertions(+), 18 deletions(-) diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index 419ecdbcd6f..8d86376593c 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -504,7 +504,13 @@ func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controll return ds } -func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) error { +func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error { + // Find out which nodes are running the daemon pods controlled by ds. + nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) + if err != nil { + return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) + } + // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node. nodeList, err := dsc.nodeLister.List(labels.Everything()) @@ -682,8 +688,12 @@ func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds return updateErr } -func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) error { +func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) error { glog.V(4).Infof("Updating daemon set status") + nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) + if err != nil { + return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) + } nodeList, err := dsc.nodeLister.List(labels.Everything()) if err != nil { @@ -760,12 +770,6 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { return nil } - // Find out which nodes are running the daemon pods controlled by ds. - nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) - if err != nil { - return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) - } - // Don't process a daemon set until all its creations and deletions have been processed. // For example if daemon set foo asked for 3 new daemon pods in the previous call to manage, // then we do not want to call manage on foo until the daemon pods have been created. @@ -773,25 +777,27 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { if err != nil { return fmt.Errorf("couldn't get key for object %#v: %v", ds, err) } - dsNeedsSync := dsc.expectations.SatisfiedExpectations(dsKey) - if dsNeedsSync && ds.DeletionTimestamp == nil { - if err := dsc.manage(ds, nodeToDaemonPods); err != nil { - return err - } + if ds.DeletionTimestamp != nil || !dsc.expectations.SatisfiedExpectations(dsKey) { + // Only update status. + return dsc.updateDaemonSetStatus(ds) } - dsNeedsSync = dsc.expectations.SatisfiedExpectations(dsKey) - if dsNeedsSync && ds.DeletionTimestamp == nil { + if err := dsc.manage(ds); err != nil { + return err + } + + // Process rolling updates if we're ready. + if dsc.expectations.SatisfiedExpectations(dsKey) { switch ds.Spec.UpdateStrategy.Type { case extensions.RollingUpdateDaemonSetStrategyType: - err = dsc.rollingUpdate(ds, nodeToDaemonPods) + err = dsc.rollingUpdate(ds) } if err != nil { return err } } - return dsc.updateDaemonSetStatus(ds, nodeToDaemonPods) + return dsc.updateDaemonSetStatus(ds) } // nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a diff --git a/pkg/controller/daemon/daemoncontroller_test.go b/pkg/controller/daemon/daemoncontroller_test.go index b41ba545840..120a6aa0de9 100644 --- a/pkg/controller/daemon/daemoncontroller_test.go +++ b/pkg/controller/daemon/daemoncontroller_test.go @@ -346,6 +346,32 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) { syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0) } +func TestSimpleDaemonSetUpdatesStatusAfterLaunchingPods(t *testing.T) { + manager, podControl, clientset := newTestController() + + var updated *extensions.DaemonSet + clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) { + if action.GetSubresource() != "status" { + return false, nil, nil + } + if u, ok := action.(core.UpdateAction); ok { + updated = u.GetObject().(*extensions.DaemonSet) + } + return false, nil, nil + }) + + ds := newDaemonSet("foo") + manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0) + + // Make sure the single sync() updated Status already for the change made + // during the manage() phase. + if got, want := updated.Status.CurrentNumberScheduled, int32(5); got != want { + t.Errorf("Status.CurrentNumberScheduled = %v, want %v", got, want) + } +} + // DaemonSets should do nothing if there aren't any nodes func TestNoNodesDoesNothing(t *testing.T) { manager, podControl, _ := newTestController() diff --git a/pkg/controller/daemon/update.go b/pkg/controller/daemon/update.go index 44895a54a08..56645b4eb14 100644 --- a/pkg/controller/daemon/update.go +++ b/pkg/controller/daemon/update.go @@ -31,7 +31,12 @@ import ( // rollingUpdate deletes old daemon set pods making sure that no more than // ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable -func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) error { +func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet) error { + nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) + if err != nil { + return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) + } + _, oldPods, err := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods) maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeToDaemonPods) if err != nil {