mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 17:30:00 +00:00
DaemonSet: Use ControllerRef to route watch events.
This is part of the completion of ControllerRef, as described here: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/controller-ref.md#watches
This commit is contained in:
parent
421e0bbd83
commit
1099811833
@ -36,7 +36,6 @@ func startDaemonSetController(ctx ControllerContext) (bool, error) {
|
|||||||
ctx.InformerFactory.Core().V1().Pods(),
|
ctx.InformerFactory.Core().V1().Pods(),
|
||||||
ctx.InformerFactory.Core().V1().Nodes(),
|
ctx.InformerFactory.Core().V1().Nodes(),
|
||||||
ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),
|
ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),
|
||||||
int(ctx.Options.LookupCacheSizeForDaemonSet),
|
|
||||||
).Run(int(ctx.Options.ConcurrentDaemonSetSyncs), ctx.Stop)
|
).Run(int(ctx.Options.ConcurrentDaemonSetSyncs), ctx.Stop)
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
@ -134,7 +134,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet, allControllers []string, disabled
|
|||||||
fs.Int32Var(&s.ConcurrentSATokenSyncs, "concurrent-serviceaccount-token-syncs", s.ConcurrentSATokenSyncs, "The number of service account token objects that are allowed to sync concurrently. Larger number = more responsive token generation, but more CPU (and network) load")
|
fs.Int32Var(&s.ConcurrentSATokenSyncs, "concurrent-serviceaccount-token-syncs", s.ConcurrentSATokenSyncs, "The number of service account token objects that are allowed to sync concurrently. Larger number = more responsive token generation, but more CPU (and network) load")
|
||||||
fs.Int32Var(&s.LookupCacheSizeForRC, "replication-controller-lookup-cache-size", s.LookupCacheSizeForRC, "This flag is deprecated and will be removed in future releases. ReplicationController no longer requires a lookup cache.")
|
fs.Int32Var(&s.LookupCacheSizeForRC, "replication-controller-lookup-cache-size", s.LookupCacheSizeForRC, "This flag is deprecated and will be removed in future releases. ReplicationController no longer requires a lookup cache.")
|
||||||
fs.Int32Var(&s.LookupCacheSizeForRS, "replicaset-lookup-cache-size", s.LookupCacheSizeForRS, "This flag is deprecated and will be removed in future releases. ReplicaSet no longer requires a lookup cache.")
|
fs.Int32Var(&s.LookupCacheSizeForRS, "replicaset-lookup-cache-size", s.LookupCacheSizeForRS, "This flag is deprecated and will be removed in future releases. ReplicaSet no longer requires a lookup cache.")
|
||||||
fs.Int32Var(&s.LookupCacheSizeForDaemonSet, "daemonset-lookup-cache-size", s.LookupCacheSizeForDaemonSet, "The the size of lookup cache for daemonsets. Larger number = more responsive daemonsets, but more MEM load.")
|
fs.Int32Var(&s.LookupCacheSizeForDaemonSet, "daemonset-lookup-cache-size", s.LookupCacheSizeForDaemonSet, "This flag is deprecated and will be removed in future releases. DaemonSet no longer requires a lookup cache.")
|
||||||
fs.DurationVar(&s.ServiceSyncPeriod.Duration, "service-sync-period", s.ServiceSyncPeriod.Duration, "The period for syncing services with their external load balancers")
|
fs.DurationVar(&s.ServiceSyncPeriod.Duration, "service-sync-period", s.ServiceSyncPeriod.Duration, "The period for syncing services with their external load balancers")
|
||||||
fs.DurationVar(&s.NodeSyncPeriod.Duration, "node-sync-period", 0, ""+
|
fs.DurationVar(&s.NodeSyncPeriod.Duration, "node-sync-period", 0, ""+
|
||||||
"This flag is deprecated and will be removed in future releases. See node-monitor-period for Node health checking or "+
|
"This flag is deprecated and will be removed in future releases. See node-monitor-period for Node health checking or "+
|
||||||
|
@ -79,6 +79,7 @@ go_test(
|
|||||||
"//vendor:k8s.io/client-go/testing",
|
"//vendor:k8s.io/client-go/testing",
|
||||||
"//vendor:k8s.io/client-go/tools/cache",
|
"//vendor:k8s.io/client-go/tools/cache",
|
||||||
"//vendor:k8s.io/client-go/tools/record",
|
"//vendor:k8s.io/client-go/tools/record",
|
||||||
|
"//vendor:k8s.io/client-go/util/workqueue",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -108,13 +108,11 @@ type DaemonSetsController struct {
|
|||||||
// Added as a member to the struct to allow injection for testing.
|
// Added as a member to the struct to allow injection for testing.
|
||||||
nodeStoreSynced cache.InformerSynced
|
nodeStoreSynced cache.InformerSynced
|
||||||
|
|
||||||
lookupCache *controller.MatchingCache
|
|
||||||
|
|
||||||
// DaemonSet keys that need to be synced.
|
// DaemonSet keys that need to be synced.
|
||||||
queue workqueue.RateLimitingInterface
|
queue workqueue.RateLimitingInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface, lookupCacheSize int) *DaemonSetsController {
|
func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) *DaemonSetsController {
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
eventBroadcaster.StartLogging(glog.Infof)
|
eventBroadcaster.StartLogging(glog.Infof)
|
||||||
// TODO: remove the wrapper when every clients have moved to use the clientset.
|
// TODO: remove the wrapper when every clients have moved to use the clientset.
|
||||||
@ -144,21 +142,6 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo
|
|||||||
UpdateFunc: func(old, cur interface{}) {
|
UpdateFunc: func(old, cur interface{}) {
|
||||||
oldDS := old.(*extensions.DaemonSet)
|
oldDS := old.(*extensions.DaemonSet)
|
||||||
curDS := cur.(*extensions.DaemonSet)
|
curDS := cur.(*extensions.DaemonSet)
|
||||||
// We should invalidate the whole lookup cache if a DS's selector has been updated.
|
|
||||||
//
|
|
||||||
// Imagine that you have two RSs:
|
|
||||||
// * old DS1
|
|
||||||
// * new DS2
|
|
||||||
// You also have a pod that is attached to DS2 (because it doesn't match DS1 selector).
|
|
||||||
// Now imagine that you are changing DS1 selector so that it is now matching that pod,
|
|
||||||
// in such case we must invalidate the whole cache so that pod could be adopted by DS1
|
|
||||||
//
|
|
||||||
// This makes the lookup cache less helpful, but selector update does not happen often,
|
|
||||||
// so it's not a big problem
|
|
||||||
if !reflect.DeepEqual(oldDS.Spec.Selector, curDS.Spec.Selector) {
|
|
||||||
dsc.lookupCache.InvalidateAll()
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(4).Infof("Updating daemon set %s", oldDS.Name)
|
glog.V(4).Infof("Updating daemon set %s", oldDS.Name)
|
||||||
dsc.enqueueDaemonSet(curDS)
|
dsc.enqueueDaemonSet(curDS)
|
||||||
},
|
},
|
||||||
@ -187,7 +170,6 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo
|
|||||||
|
|
||||||
dsc.syncHandler = dsc.syncDaemonSet
|
dsc.syncHandler = dsc.syncDaemonSet
|
||||||
dsc.enqueueDaemonSet = dsc.enqueue
|
dsc.enqueueDaemonSet = dsc.enqueue
|
||||||
dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize)
|
|
||||||
return dsc
|
return dsc
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -276,78 +258,57 @@ func (dsc *DaemonSetsController) enqueueDaemonSetAfter(obj interface{}, after ti
|
|||||||
dsc.queue.AddAfter(key, after)
|
dsc.queue.AddAfter(key, after)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dsc *DaemonSetsController) getPodDaemonSet(pod *v1.Pod) *extensions.DaemonSet {
|
// getPodDaemonSets returns a list of DaemonSets that potentially match the pod.
|
||||||
// look up in the cache, if cached and the cache is valid, just return cached value
|
func (dsc *DaemonSetsController) getPodDaemonSets(pod *v1.Pod) []*extensions.DaemonSet {
|
||||||
if obj, cached := dsc.lookupCache.GetMatchingObject(pod); cached {
|
|
||||||
ds, ok := obj.(*extensions.DaemonSet)
|
|
||||||
if !ok {
|
|
||||||
// This should not happen
|
|
||||||
utilruntime.HandleError(fmt.Errorf("lookup cache does not return a DaemonSet object"))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if dsc.isCacheValid(pod, ds) {
|
|
||||||
return ds
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sets, err := dsc.dsLister.GetPodDaemonSets(pod)
|
sets, err := dsc.dsLister.GetPodDaemonSets(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(4).Infof("No daemon sets found for pod %v, daemon set controller will avoid syncing", pod.Name)
|
glog.V(4).Infof("No daemon sets found for pod %v, daemon set controller will avoid syncing", pod.Name)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if len(sets) > 1 {
|
if len(sets) > 1 {
|
||||||
// More than two items in this list indicates user error. If two daemon
|
// ControllerRef will ensure we don't do anythign crazy, but more than one
|
||||||
// sets overlap, sort by creation timestamp, subsort by name, then pick
|
// item in this list nevertheless constitutes user error.
|
||||||
// the first.
|
|
||||||
utilruntime.HandleError(fmt.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels))
|
utilruntime.HandleError(fmt.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels))
|
||||||
sort.Sort(byCreationTimestamp(sets))
|
|
||||||
}
|
}
|
||||||
|
return sets
|
||||||
// update lookup cache
|
|
||||||
dsc.lookupCache.Update(pod, sets[0])
|
|
||||||
|
|
||||||
return sets[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
// isCacheValid check if the cache is valid
|
|
||||||
func (dsc *DaemonSetsController) isCacheValid(pod *v1.Pod, cachedDS *extensions.DaemonSet) bool {
|
|
||||||
_, err := dsc.dsLister.DaemonSets(cachedDS.Namespace).Get(cachedDS.Name)
|
|
||||||
// ds has been deleted or updated, cache is invalid
|
|
||||||
if err != nil || !isDaemonSetMatch(pod, cachedDS) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// isDaemonSetMatch take a Pod and DaemonSet, return whether the Pod and DaemonSet are matching
|
|
||||||
// TODO(mqliang): This logic is a copy from GetPodDaemonSets(), remove the duplication
|
|
||||||
func isDaemonSetMatch(pod *v1.Pod, ds *extensions.DaemonSet) bool {
|
|
||||||
if ds.Namespace != pod.Namespace {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
|
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("invalid selector: %v", err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything.
|
|
||||||
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dsc *DaemonSetsController) addPod(obj interface{}) {
|
func (dsc *DaemonSetsController) addPod(obj interface{}) {
|
||||||
pod := obj.(*v1.Pod)
|
pod := obj.(*v1.Pod)
|
||||||
glog.V(4).Infof("Pod %s added.", pod.Name)
|
glog.V(4).Infof("Pod %s added.", pod.Name)
|
||||||
if ds := dsc.getPodDaemonSet(pod); ds != nil {
|
|
||||||
|
if pod.DeletionTimestamp != nil {
|
||||||
|
// on a restart of the controller manager, it's possible a new pod shows up in a state that
|
||||||
|
// is already pending deletion. Prevent the pod from being a creation observation.
|
||||||
|
dsc.deletePod(pod)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If it has a ControllerRef, that's all that matters.
|
||||||
|
if controllerRef := controller.GetControllerOf(pod); controllerRef != nil {
|
||||||
|
if controllerRef.Kind != controllerKind.Kind {
|
||||||
|
// It's controlled by a different type of controller.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ds, err := dsc.dsLister.DaemonSets(pod.Namespace).Get(controllerRef.Name)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
dsKey, err := controller.KeyFunc(ds)
|
dsKey, err := controller.KeyFunc(ds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
dsc.expectations.CreationObserved(dsKey)
|
dsc.expectations.CreationObserved(dsKey)
|
||||||
dsc.enqueueDaemonSet(ds)
|
dsc.enqueueDaemonSet(ds)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync
|
||||||
|
// them to see if anyone wants to adopt it.
|
||||||
|
// DO NOT observe creation because no controller should be waiting for an
|
||||||
|
// orphan.
|
||||||
|
for _, ds := range dsc.getPodDaemonSets(pod) {
|
||||||
|
dsc.enqueueDaemonSet(ds)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -364,22 +325,43 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
|
|||||||
}
|
}
|
||||||
glog.V(4).Infof("Pod %s updated.", curPod.Name)
|
glog.V(4).Infof("Pod %s updated.", curPod.Name)
|
||||||
changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod)
|
changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod)
|
||||||
if curDS := dsc.getPodDaemonSet(curPod); curDS != nil {
|
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
|
||||||
dsc.enqueueDaemonSet(curDS)
|
|
||||||
|
|
||||||
|
curControllerRef := controller.GetControllerOf(curPod)
|
||||||
|
oldControllerRef := controller.GetControllerOf(oldPod)
|
||||||
|
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
|
||||||
|
if controllerRefChanged &&
|
||||||
|
oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind {
|
||||||
|
// The ControllerRef was changed. Sync the old controller, if any.
|
||||||
|
ds, err := dsc.dsLister.DaemonSets(oldPod.Namespace).Get(oldControllerRef.Name)
|
||||||
|
if err == nil {
|
||||||
|
dsc.enqueueDaemonSet(ds)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If it has a ControllerRef, that's all that matters.
|
||||||
|
if curControllerRef != nil {
|
||||||
|
if curControllerRef.Kind != controllerKind.Kind {
|
||||||
|
// It's controlled by a different type of controller.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ds, err := dsc.dsLister.DaemonSets(curPod.Namespace).Get(curControllerRef.Name)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dsc.enqueueDaemonSet(ds)
|
||||||
// See https://github.com/kubernetes/kubernetes/pull/38076 for more details
|
// See https://github.com/kubernetes/kubernetes/pull/38076 for more details
|
||||||
if changedToReady && curDS.Spec.MinReadySeconds > 0 {
|
if changedToReady && ds.Spec.MinReadySeconds > 0 {
|
||||||
dsc.enqueueDaemonSetAfter(curDS, time.Duration(curDS.Spec.MinReadySeconds)*time.Second)
|
dsc.enqueueDaemonSetAfter(ds, time.Duration(ds.Spec.MinReadySeconds)*time.Second)
|
||||||
}
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
// If the labels have not changed, then the daemon set responsible for
|
|
||||||
// the pod is the same as it was before. In that case we have enqueued the daemon
|
// Otherwise, it's an orphan. If anything changed, sync matching controllers
|
||||||
// set above, and do not have to enqueue the set again.
|
// to see if anyone wants to adopt it now.
|
||||||
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
|
if labelChanged || controllerRefChanged {
|
||||||
// It's ok if both oldDS and curDS are the same, because curDS will set
|
for _, ds := range dsc.getPodDaemonSets(curPod) {
|
||||||
// the expectations on its run so oldDS will have no effect.
|
dsc.enqueueDaemonSet(ds)
|
||||||
if oldDS := dsc.getPodDaemonSet(oldPod); oldDS != nil {
|
|
||||||
dsc.enqueueDaemonSet(oldDS)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -394,25 +376,37 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) {
|
|||||||
if !ok {
|
if !ok {
|
||||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||||
if !ok {
|
if !ok {
|
||||||
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
|
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pod, ok = tombstone.Obj.(*v1.Pod)
|
pod, ok = tombstone.Obj.(*v1.Pod)
|
||||||
if !ok {
|
if !ok {
|
||||||
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj))
|
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("Pod %s deleted.", pod.Name)
|
glog.V(4).Infof("Pod %s deleted.", pod.Name)
|
||||||
if ds := dsc.getPodDaemonSet(pod); ds != nil {
|
|
||||||
|
controllerRef := controller.GetControllerOf(pod)
|
||||||
|
if controllerRef == nil {
|
||||||
|
// No controller should care about orphans being deleted.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if controllerRef.Kind != controllerKind.Kind {
|
||||||
|
// It's controlled by a different type of controller.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ds, err := dsc.dsLister.DaemonSets(pod.Namespace).Get(controllerRef.Name)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
dsKey, err := controller.KeyFunc(ds)
|
dsKey, err := controller.KeyFunc(ds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
dsc.expectations.DeletionObserved(dsKey)
|
dsc.expectations.DeletionObserved(dsKey)
|
||||||
dsc.enqueueDaemonSet(ds)
|
dsc.enqueueDaemonSet(ds)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dsc *DaemonSetsController) addNode(obj interface{}) {
|
func (dsc *DaemonSetsController) addNode(obj interface{}) {
|
||||||
@ -481,7 +475,7 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// Use ControllerRefManager to adopt/orphan as needed.
|
// Use ControllerRefManager to adopt/orphan as needed.
|
||||||
cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, ControllerKind)
|
cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind)
|
||||||
claimedPods, err := cm.ClaimPods(pods)
|
claimedPods, err := cm.ClaimPods(pods)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -867,7 +861,7 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *exten
|
|||||||
}
|
}
|
||||||
// ignore pods that belong to the daemonset when taking into account whether
|
// ignore pods that belong to the daemonset when taking into account whether
|
||||||
// a daemonset should bind to a node.
|
// a daemonset should bind to a node.
|
||||||
if pds := dsc.getPodDaemonSet(pod); pds != nil && ds.Name == pds.Name {
|
if controllerRef := controller.GetControllerOf(pod); controllerRef != nil && controllerRef.UID == ds.UID {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
pods = append(pods, pod)
|
pods = append(pods, pod)
|
||||||
@ -975,8 +969,8 @@ func Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorit
|
|||||||
func newControllerRef(ds *extensions.DaemonSet) *metav1.OwnerReference {
|
func newControllerRef(ds *extensions.DaemonSet) *metav1.OwnerReference {
|
||||||
isController := true
|
isController := true
|
||||||
return &metav1.OwnerReference{
|
return &metav1.OwnerReference{
|
||||||
APIVersion: ControllerKind.GroupVersion().String(),
|
APIVersion: controllerKind.GroupVersion().String(),
|
||||||
Kind: ControllerKind.Kind,
|
Kind: controllerKind.Kind,
|
||||||
Name: ds.Name,
|
Name: ds.Name,
|
||||||
UID: ds.UID,
|
UID: ds.UID,
|
||||||
Controller: &isController,
|
Controller: &isController,
|
||||||
|
@ -18,6 +18,9 @@ package daemon
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
@ -30,6 +33,7 @@ import (
|
|||||||
core "k8s.io/client-go/testing"
|
core "k8s.io/client-go/testing"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
|
"k8s.io/client-go/util/workqueue"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/testapi"
|
"k8s.io/kubernetes/pkg/api/testapi"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
@ -248,7 +252,6 @@ func newTestController(initialObjects ...runtime.Object) (*daemonSetsController,
|
|||||||
informerFactory.Core().V1().Pods(),
|
informerFactory.Core().V1().Pods(),
|
||||||
informerFactory.Core().V1().Nodes(),
|
informerFactory.Core().V1().Nodes(),
|
||||||
clientset,
|
clientset,
|
||||||
0,
|
|
||||||
)
|
)
|
||||||
manager.eventRecorder = record.NewFakeRecorder(100)
|
manager.eventRecorder = record.NewFakeRecorder(100)
|
||||||
|
|
||||||
@ -542,16 +545,17 @@ func TestPortConflictWithSameDaemonPodDoesNotDeletePod(t *testing.T) {
|
|||||||
manager, podControl, _ := newTestController()
|
manager, podControl, _ := newTestController()
|
||||||
node := newNode("port-conflict", nil)
|
node := newNode("port-conflict", nil)
|
||||||
manager.nodeStore.Add(node)
|
manager.nodeStore.Add(node)
|
||||||
|
ds := newDaemonSet("foo")
|
||||||
|
ds.Spec.Template.Spec = podSpec
|
||||||
|
manager.dsStore.Add(ds)
|
||||||
manager.podStore.Add(&v1.Pod{
|
manager.podStore.Add(&v1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Labels: simpleDaemonSetLabel,
|
Labels: simpleDaemonSetLabel,
|
||||||
Namespace: metav1.NamespaceDefault,
|
Namespace: metav1.NamespaceDefault,
|
||||||
|
OwnerReferences: []metav1.OwnerReference{*newControllerRef(ds)},
|
||||||
},
|
},
|
||||||
Spec: podSpec,
|
Spec: podSpec,
|
||||||
})
|
})
|
||||||
ds := newDaemonSet("foo")
|
|
||||||
ds.Spec.Template.Spec = podSpec
|
|
||||||
manager.dsStore.Add(ds)
|
|
||||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
|
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1244,3 +1248,244 @@ func TestGetNodesToDaemonPods(t *testing.T) {
|
|||||||
t.Errorf("unexpected pod %v was returned", podName)
|
t.Errorf("unexpected pod %v was returned", podName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAddPod(t *testing.T) {
|
||||||
|
manager, _, _ := newTestController()
|
||||||
|
ds1 := newDaemonSet("foo1")
|
||||||
|
ds2 := newDaemonSet("foo2")
|
||||||
|
manager.dsStore.Add(ds1)
|
||||||
|
manager.dsStore.Add(ds2)
|
||||||
|
|
||||||
|
pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1)
|
||||||
|
manager.addPod(pod1)
|
||||||
|
if got, want := manager.queue.Len(), 1; got != want {
|
||||||
|
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
key, done := manager.queue.Get()
|
||||||
|
if key == nil || done {
|
||||||
|
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
|
||||||
|
}
|
||||||
|
expectedKey, _ := controller.KeyFunc(ds1)
|
||||||
|
if got, want := key.(string), expectedKey; got != want {
|
||||||
|
t.Errorf("queue.Get() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2)
|
||||||
|
manager.addPod(pod2)
|
||||||
|
if got, want := manager.queue.Len(), 1; got != want {
|
||||||
|
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
key, done = manager.queue.Get()
|
||||||
|
if key == nil || done {
|
||||||
|
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
|
||||||
|
}
|
||||||
|
expectedKey, _ = controller.KeyFunc(ds2)
|
||||||
|
if got, want := key.(string), expectedKey; got != want {
|
||||||
|
t.Errorf("queue.Get() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAddPodOrphan(t *testing.T) {
|
||||||
|
manager, _, _ := newTestController()
|
||||||
|
ds1 := newDaemonSet("foo1")
|
||||||
|
ds2 := newDaemonSet("foo2")
|
||||||
|
ds3 := newDaemonSet("foo3")
|
||||||
|
ds3.Spec.Selector.MatchLabels = simpleDaemonSetLabel2
|
||||||
|
manager.dsStore.Add(ds1)
|
||||||
|
manager.dsStore.Add(ds2)
|
||||||
|
manager.dsStore.Add(ds3)
|
||||||
|
|
||||||
|
// Make pod an orphan. Expect matching sets to be queued.
|
||||||
|
pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil)
|
||||||
|
manager.addPod(pod)
|
||||||
|
if got, want := manager.queue.Len(), 2; got != want {
|
||||||
|
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
if got, want := getQueuedKeys(manager.queue), []string{"default/foo1", "default/foo2"}; !reflect.DeepEqual(got, want) {
|
||||||
|
t.Errorf("getQueuedKeys() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdatePod(t *testing.T) {
|
||||||
|
manager, _, _ := newTestController()
|
||||||
|
ds1 := newDaemonSet("foo1")
|
||||||
|
ds2 := newDaemonSet("foo2")
|
||||||
|
manager.dsStore.Add(ds1)
|
||||||
|
manager.dsStore.Add(ds2)
|
||||||
|
|
||||||
|
pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1)
|
||||||
|
prev := *pod1
|
||||||
|
bumpResourceVersion(pod1)
|
||||||
|
manager.updatePod(&prev, pod1)
|
||||||
|
if got, want := manager.queue.Len(), 1; got != want {
|
||||||
|
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
key, done := manager.queue.Get()
|
||||||
|
if key == nil || done {
|
||||||
|
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
|
||||||
|
}
|
||||||
|
expectedKey, _ := controller.KeyFunc(ds1)
|
||||||
|
if got, want := key.(string), expectedKey; got != want {
|
||||||
|
t.Errorf("queue.Get() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2)
|
||||||
|
prev = *pod2
|
||||||
|
bumpResourceVersion(pod2)
|
||||||
|
manager.updatePod(&prev, pod2)
|
||||||
|
if got, want := manager.queue.Len(), 1; got != want {
|
||||||
|
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
key, done = manager.queue.Get()
|
||||||
|
if key == nil || done {
|
||||||
|
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
|
||||||
|
}
|
||||||
|
expectedKey, _ = controller.KeyFunc(ds2)
|
||||||
|
if got, want := key.(string), expectedKey; got != want {
|
||||||
|
t.Errorf("queue.Get() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdatePodOrphanSameLabels(t *testing.T) {
|
||||||
|
manager, _, _ := newTestController()
|
||||||
|
ds1 := newDaemonSet("foo1")
|
||||||
|
ds2 := newDaemonSet("foo2")
|
||||||
|
manager.dsStore.Add(ds1)
|
||||||
|
manager.dsStore.Add(ds2)
|
||||||
|
|
||||||
|
pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil)
|
||||||
|
prev := *pod
|
||||||
|
bumpResourceVersion(pod)
|
||||||
|
manager.updatePod(&prev, pod)
|
||||||
|
if got, want := manager.queue.Len(), 0; got != want {
|
||||||
|
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
|
||||||
|
manager, _, _ := newTestController()
|
||||||
|
ds1 := newDaemonSet("foo1")
|
||||||
|
ds2 := newDaemonSet("foo2")
|
||||||
|
manager.dsStore.Add(ds1)
|
||||||
|
manager.dsStore.Add(ds2)
|
||||||
|
|
||||||
|
pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil)
|
||||||
|
prev := *pod
|
||||||
|
prev.Labels = map[string]string{"foo2": "bar2"}
|
||||||
|
bumpResourceVersion(pod)
|
||||||
|
manager.updatePod(&prev, pod)
|
||||||
|
if got, want := manager.queue.Len(), 2; got != want {
|
||||||
|
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
if got, want := getQueuedKeys(manager.queue), []string{"default/foo1", "default/foo2"}; !reflect.DeepEqual(got, want) {
|
||||||
|
t.Errorf("getQueuedKeys() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdatePodChangeControllerRef(t *testing.T) {
|
||||||
|
manager, _, _ := newTestController()
|
||||||
|
ds1 := newDaemonSet("foo1")
|
||||||
|
ds2 := newDaemonSet("foo2")
|
||||||
|
manager.dsStore.Add(ds1)
|
||||||
|
manager.dsStore.Add(ds2)
|
||||||
|
|
||||||
|
pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1)
|
||||||
|
prev := *pod
|
||||||
|
prev.OwnerReferences = []metav1.OwnerReference{*newControllerRef(ds2)}
|
||||||
|
bumpResourceVersion(pod)
|
||||||
|
manager.updatePod(&prev, pod)
|
||||||
|
if got, want := manager.queue.Len(), 2; got != want {
|
||||||
|
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdatePodControllerRefRemoved(t *testing.T) {
|
||||||
|
manager, _, _ := newTestController()
|
||||||
|
ds1 := newDaemonSet("foo1")
|
||||||
|
ds2 := newDaemonSet("foo2")
|
||||||
|
manager.dsStore.Add(ds1)
|
||||||
|
manager.dsStore.Add(ds2)
|
||||||
|
|
||||||
|
pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1)
|
||||||
|
prev := *pod
|
||||||
|
pod.OwnerReferences = nil
|
||||||
|
bumpResourceVersion(pod)
|
||||||
|
manager.updatePod(&prev, pod)
|
||||||
|
if got, want := manager.queue.Len(), 2; got != want {
|
||||||
|
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeletePod(t *testing.T) {
|
||||||
|
manager, _, _ := newTestController()
|
||||||
|
ds1 := newDaemonSet("foo1")
|
||||||
|
ds2 := newDaemonSet("foo2")
|
||||||
|
manager.dsStore.Add(ds1)
|
||||||
|
manager.dsStore.Add(ds2)
|
||||||
|
|
||||||
|
pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1)
|
||||||
|
manager.deletePod(pod1)
|
||||||
|
if got, want := manager.queue.Len(), 1; got != want {
|
||||||
|
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
key, done := manager.queue.Get()
|
||||||
|
if key == nil || done {
|
||||||
|
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
|
||||||
|
}
|
||||||
|
expectedKey, _ := controller.KeyFunc(ds1)
|
||||||
|
if got, want := key.(string), expectedKey; got != want {
|
||||||
|
t.Errorf("queue.Get() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2)
|
||||||
|
manager.deletePod(pod2)
|
||||||
|
if got, want := manager.queue.Len(), 1; got != want {
|
||||||
|
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
key, done = manager.queue.Get()
|
||||||
|
if key == nil || done {
|
||||||
|
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
|
||||||
|
}
|
||||||
|
expectedKey, _ = controller.KeyFunc(ds2)
|
||||||
|
if got, want := key.(string), expectedKey; got != want {
|
||||||
|
t.Errorf("queue.Get() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeletePodOrphan(t *testing.T) {
|
||||||
|
manager, _, _ := newTestController()
|
||||||
|
ds1 := newDaemonSet("foo1")
|
||||||
|
ds2 := newDaemonSet("foo2")
|
||||||
|
ds3 := newDaemonSet("foo3")
|
||||||
|
ds3.Spec.Selector.MatchLabels = simpleDaemonSetLabel2
|
||||||
|
manager.dsStore.Add(ds1)
|
||||||
|
manager.dsStore.Add(ds2)
|
||||||
|
manager.dsStore.Add(ds3)
|
||||||
|
|
||||||
|
pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil)
|
||||||
|
manager.deletePod(pod)
|
||||||
|
if got, want := manager.queue.Len(), 0; got != want {
|
||||||
|
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func bumpResourceVersion(obj metav1.Object) {
|
||||||
|
ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32)
|
||||||
|
obj.SetResourceVersion(strconv.FormatInt(ver+1, 10))
|
||||||
|
}
|
||||||
|
|
||||||
|
// getQueuedKeys returns a sorted list of keys in the queue.
|
||||||
|
// It can be used to quickly check that multiple keys are in there.
|
||||||
|
func getQueuedKeys(queue workqueue.RateLimitingInterface) []string {
|
||||||
|
var keys []string
|
||||||
|
count := queue.Len()
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
key, done := queue.Get()
|
||||||
|
if done {
|
||||||
|
return keys
|
||||||
|
}
|
||||||
|
keys = append(keys, key.(string))
|
||||||
|
}
|
||||||
|
sort.Strings(keys)
|
||||||
|
return keys
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user