mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
Merge pull request #113622 from 249043822/br-context-logging-daemon
daemonset: use contextual logging
This commit is contained in:
commit
f769c66aa8
@ -34,7 +34,9 @@ import (
|
||||
)
|
||||
|
||||
func startDaemonSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "daemonset-controller"))
|
||||
dsc, err := daemon.NewDaemonSetsController(
|
||||
ctx,
|
||||
controllerContext.InformerFactory.Apps().V1().DaemonSets(),
|
||||
controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
|
||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||
|
@ -130,6 +130,7 @@ type DaemonSetsController struct {
|
||||
|
||||
// NewDaemonSetsController creates a new DaemonSetsController
|
||||
func NewDaemonSetsController(
|
||||
ctx context.Context,
|
||||
daemonSetInformer appsinformers.DaemonSetInformer,
|
||||
historyInformer appsinformers.ControllerRevisionInformer,
|
||||
podInformer coreinformers.PodInformer,
|
||||
@ -138,7 +139,7 @@ func NewDaemonSetsController(
|
||||
failedPodsBackoff *flowcontrol.Backoff,
|
||||
) (*DaemonSetsController, error) {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
dsc := &DaemonSetsController{
|
||||
kubeClient: kubeClient,
|
||||
eventBroadcaster: eventBroadcaster,
|
||||
@ -156,17 +157,29 @@ func NewDaemonSetsController(
|
||||
}
|
||||
|
||||
daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: dsc.addDaemonset,
|
||||
UpdateFunc: dsc.updateDaemonset,
|
||||
DeleteFunc: dsc.deleteDaemonset,
|
||||
AddFunc: func(obj interface{}) {
|
||||
dsc.addDaemonset(logger, obj)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
dsc.updateDaemonset(logger, oldObj, newObj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
dsc.deleteDaemonset(logger, obj)
|
||||
},
|
||||
})
|
||||
dsc.dsLister = daemonSetInformer.Lister()
|
||||
dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced
|
||||
|
||||
historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: dsc.addHistory,
|
||||
UpdateFunc: dsc.updateHistory,
|
||||
DeleteFunc: dsc.deleteHistory,
|
||||
AddFunc: func(obj interface{}) {
|
||||
dsc.addHistory(logger, obj)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
dsc.updateHistory(logger, oldObj, newObj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
dsc.deleteHistory(logger, obj)
|
||||
},
|
||||
})
|
||||
dsc.historyLister = historyInformer.Lister()
|
||||
dsc.historyStoreSynced = historyInformer.Informer().HasSynced
|
||||
@ -174,16 +187,26 @@ func NewDaemonSetsController(
|
||||
// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
|
||||
// more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
|
||||
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: dsc.addPod,
|
||||
UpdateFunc: dsc.updatePod,
|
||||
DeleteFunc: dsc.deletePod,
|
||||
AddFunc: func(obj interface{}) {
|
||||
dsc.addPod(logger, obj)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
dsc.updatePod(logger, oldObj, newObj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
dsc.deletePod(logger, obj)
|
||||
},
|
||||
})
|
||||
dsc.podLister = podInformer.Lister()
|
||||
dsc.podStoreSynced = podInformer.Informer().HasSynced
|
||||
|
||||
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: dsc.addNode,
|
||||
UpdateFunc: dsc.updateNode,
|
||||
AddFunc: func(obj interface{}) {
|
||||
dsc.addNode(logger, obj)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
dsc.updateNode(logger, oldObj, newObj)
|
||||
},
|
||||
},
|
||||
)
|
||||
dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
|
||||
@ -197,13 +220,13 @@ func NewDaemonSetsController(
|
||||
return dsc, nil
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) addDaemonset(obj interface{}) {
|
||||
func (dsc *DaemonSetsController) addDaemonset(logger klog.Logger, obj interface{}) {
|
||||
ds := obj.(*apps.DaemonSet)
|
||||
klog.V(4).Infof("Adding daemon set %s", ds.Name)
|
||||
logger.V(4).Info("Adding daemon set", "daemonset", klog.KObj(ds))
|
||||
dsc.enqueueDaemonSet(ds)
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) updateDaemonset(cur, old interface{}) {
|
||||
func (dsc *DaemonSetsController) updateDaemonset(logger klog.Logger, cur, old interface{}) {
|
||||
oldDS := old.(*apps.DaemonSet)
|
||||
curDS := cur.(*apps.DaemonSet)
|
||||
|
||||
@ -214,17 +237,17 @@ func (dsc *DaemonSetsController) updateDaemonset(cur, old interface{}) {
|
||||
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldDS, err))
|
||||
return
|
||||
}
|
||||
dsc.deleteDaemonset(cache.DeletedFinalStateUnknown{
|
||||
dsc.deleteDaemonset(logger, cache.DeletedFinalStateUnknown{
|
||||
Key: key,
|
||||
Obj: oldDS,
|
||||
})
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Updating daemon set %s", oldDS.Name)
|
||||
logger.V(4).Info("Updating daemon set", "daemonset", klog.KObj(oldDS))
|
||||
dsc.enqueueDaemonSet(curDS)
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
|
||||
func (dsc *DaemonSetsController) deleteDaemonset(logger klog.Logger, obj interface{}) {
|
||||
ds, ok := obj.(*apps.DaemonSet)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
@ -238,7 +261,7 @@ func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
|
||||
return
|
||||
}
|
||||
}
|
||||
klog.V(4).Infof("Deleting daemon set %s", ds.Name)
|
||||
logger.V(4).Info("Deleting daemon set", "daemonset", klog.KObj(ds))
|
||||
|
||||
key, err := controller.KeyFunc(ds)
|
||||
if err != nil {
|
||||
@ -262,8 +285,9 @@ func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
|
||||
|
||||
defer dsc.queue.ShutDown()
|
||||
|
||||
klog.Infof("Starting daemon sets controller")
|
||||
defer klog.Infof("Shutting down daemon sets controller")
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.Info("Starting daemon sets controller")
|
||||
defer logger.Info("Shutting down daemon sets controller")
|
||||
|
||||
if !cache.WaitForNamedCacheSync("daemon sets", ctx.Done(), dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
|
||||
return
|
||||
@ -341,7 +365,7 @@ func (dsc *DaemonSetsController) getDaemonSetsForPod(pod *v1.Pod) []*apps.Daemon
|
||||
|
||||
// getDaemonSetsForHistory returns a list of DaemonSets that potentially
|
||||
// match a ControllerRevision.
|
||||
func (dsc *DaemonSetsController) getDaemonSetsForHistory(history *apps.ControllerRevision) []*apps.DaemonSet {
|
||||
func (dsc *DaemonSetsController) getDaemonSetsForHistory(logger klog.Logger, history *apps.ControllerRevision) []*apps.DaemonSet {
|
||||
daemonSets, err := dsc.dsLister.GetHistoryDaemonSets(history)
|
||||
if err != nil || len(daemonSets) == 0 {
|
||||
return nil
|
||||
@ -349,20 +373,20 @@ func (dsc *DaemonSetsController) getDaemonSetsForHistory(history *apps.Controlle
|
||||
if len(daemonSets) > 1 {
|
||||
// ControllerRef will ensure we don't do anything crazy, but more than one
|
||||
// item in this list nevertheless constitutes user error.
|
||||
klog.V(4).Infof("User error! more than one DaemonSets is selecting ControllerRevision %s/%s with labels: %#v",
|
||||
history.Namespace, history.Name, history.Labels)
|
||||
logger.V(4).Info("Found more than one DaemonSet selecting the ControllerRevision. This is potentially a user error",
|
||||
"controllerRevision", klog.KObj(history), "labels", history.Labels)
|
||||
}
|
||||
return daemonSets
|
||||
}
|
||||
|
||||
// addHistory enqueues the DaemonSet that manages a ControllerRevision when the ControllerRevision is created
|
||||
// or when the controller manager is restarted.
|
||||
func (dsc *DaemonSetsController) addHistory(obj interface{}) {
|
||||
func (dsc *DaemonSetsController) addHistory(logger klog.Logger, obj interface{}) {
|
||||
history := obj.(*apps.ControllerRevision)
|
||||
if history.DeletionTimestamp != nil {
|
||||
// On a restart of the controller manager, it's possible for an object to
|
||||
// show up in a state that is already pending deletion.
|
||||
dsc.deleteHistory(history)
|
||||
dsc.deleteHistory(logger, history)
|
||||
return
|
||||
}
|
||||
|
||||
@ -372,17 +396,17 @@ func (dsc *DaemonSetsController) addHistory(obj interface{}) {
|
||||
if ds == nil {
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("ControllerRevision %s added.", history.Name)
|
||||
logger.V(4).Info("Observed a ControllerRevision", "controllerRevision", klog.KObj(history))
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync
|
||||
// them to see if anyone wants to adopt it.
|
||||
daemonSets := dsc.getDaemonSetsForHistory(history)
|
||||
daemonSets := dsc.getDaemonSetsForHistory(logger, history)
|
||||
if len(daemonSets) == 0 {
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("Orphan ControllerRevision %s added.", history.Name)
|
||||
logger.V(4).Info("Orphan ControllerRevision added", "controllerRevision", klog.KObj(history))
|
||||
for _, ds := range daemonSets {
|
||||
dsc.enqueueDaemonSet(ds)
|
||||
}
|
||||
@ -391,7 +415,7 @@ func (dsc *DaemonSetsController) addHistory(obj interface{}) {
|
||||
// updateHistory figures out what DaemonSet(s) manage a ControllerRevision when the ControllerRevision
|
||||
// is updated and wake them up. If anything of the ControllerRevision has changed, we need to awaken
|
||||
// both the old and new DaemonSets.
|
||||
func (dsc *DaemonSetsController) updateHistory(old, cur interface{}) {
|
||||
func (dsc *DaemonSetsController) updateHistory(logger klog.Logger, old, cur interface{}) {
|
||||
curHistory := cur.(*apps.ControllerRevision)
|
||||
oldHistory := old.(*apps.ControllerRevision)
|
||||
if curHistory.ResourceVersion == oldHistory.ResourceVersion {
|
||||
@ -415,7 +439,7 @@ func (dsc *DaemonSetsController) updateHistory(old, cur interface{}) {
|
||||
if ds == nil {
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("ControllerRevision %s updated.", curHistory.Name)
|
||||
logger.V(4).Info("Observed an update to a ControllerRevision", "controllerRevision", klog.KObj(curHistory))
|
||||
dsc.enqueueDaemonSet(ds)
|
||||
return
|
||||
}
|
||||
@ -424,11 +448,11 @@ func (dsc *DaemonSetsController) updateHistory(old, cur interface{}) {
|
||||
// to see if anyone wants to adopt it now.
|
||||
labelChanged := !reflect.DeepEqual(curHistory.Labels, oldHistory.Labels)
|
||||
if labelChanged || controllerRefChanged {
|
||||
daemonSets := dsc.getDaemonSetsForHistory(curHistory)
|
||||
daemonSets := dsc.getDaemonSetsForHistory(logger, curHistory)
|
||||
if len(daemonSets) == 0 {
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("Orphan ControllerRevision %s updated.", curHistory.Name)
|
||||
logger.V(4).Info("Orphan ControllerRevision updated", "controllerRevision", klog.KObj(curHistory))
|
||||
for _, ds := range daemonSets {
|
||||
dsc.enqueueDaemonSet(ds)
|
||||
}
|
||||
@ -438,7 +462,7 @@ func (dsc *DaemonSetsController) updateHistory(old, cur interface{}) {
|
||||
// deleteHistory enqueues the DaemonSet that manages a ControllerRevision when
|
||||
// the ControllerRevision is deleted. obj could be an *app.ControllerRevision, or
|
||||
// a DeletionFinalStateUnknown marker item.
|
||||
func (dsc *DaemonSetsController) deleteHistory(obj interface{}) {
|
||||
func (dsc *DaemonSetsController) deleteHistory(logger klog.Logger, obj interface{}) {
|
||||
history, ok := obj.(*apps.ControllerRevision)
|
||||
|
||||
// When a delete is dropped, the relist will notice a ControllerRevision in the store not
|
||||
@ -467,17 +491,17 @@ func (dsc *DaemonSetsController) deleteHistory(obj interface{}) {
|
||||
if ds == nil {
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("ControllerRevision %s deleted.", history.Name)
|
||||
logger.V(4).Info("ControllerRevision deleted", "controllerRevision", klog.KObj(history))
|
||||
dsc.enqueueDaemonSet(ds)
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) addPod(obj interface{}) {
|
||||
func (dsc *DaemonSetsController) addPod(logger klog.Logger, obj interface{}) {
|
||||
pod := obj.(*v1.Pod)
|
||||
|
||||
if pod.DeletionTimestamp != nil {
|
||||
// on a restart of the controller manager, it's possible a new pod shows up in a state that
|
||||
// is already pending deletion. Prevent the pod from being a creation observation.
|
||||
dsc.deletePod(pod)
|
||||
dsc.deletePod(logger, pod)
|
||||
return
|
||||
}
|
||||
|
||||
@ -491,7 +515,7 @@ func (dsc *DaemonSetsController) addPod(obj interface{}) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("Pod %s added.", pod.Name)
|
||||
logger.V(4).Info("Pod added", "pod", klog.KObj(pod))
|
||||
dsc.expectations.CreationObserved(dsKey)
|
||||
dsc.enqueueDaemonSet(ds)
|
||||
return
|
||||
@ -505,7 +529,7 @@ func (dsc *DaemonSetsController) addPod(obj interface{}) {
|
||||
if len(dss) == 0 {
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("Orphan Pod %s added.", pod.Name)
|
||||
logger.V(4).Info("Orphan Pod added", "pod", klog.KObj(pod))
|
||||
for _, ds := range dss {
|
||||
dsc.enqueueDaemonSet(ds)
|
||||
}
|
||||
@ -514,7 +538,7 @@ func (dsc *DaemonSetsController) addPod(obj interface{}) {
|
||||
// When a pod is updated, figure out what sets manage it and wake them
|
||||
// up. If the labels of the pod have changed we need to awaken both the old
|
||||
// and new set. old and cur must be *v1.Pod types.
|
||||
func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
|
||||
func (dsc *DaemonSetsController) updatePod(logger klog.Logger, old, cur interface{}) {
|
||||
curPod := cur.(*v1.Pod)
|
||||
oldPod := old.(*v1.Pod)
|
||||
if curPod.ResourceVersion == oldPod.ResourceVersion {
|
||||
@ -528,7 +552,7 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
|
||||
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
|
||||
// for modification of the deletion timestamp and expect an ds to create more replicas asap, not wait
|
||||
// until the kubelet actually deletes the pod.
|
||||
dsc.deletePod(curPod)
|
||||
dsc.deletePod(logger, curPod)
|
||||
return
|
||||
}
|
||||
|
||||
@ -548,7 +572,7 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
|
||||
if ds == nil {
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("Pod %s updated.", curPod.Name)
|
||||
logger.V(4).Info("Pod updated", "pod", klog.KObj(curPod))
|
||||
dsc.enqueueDaemonSet(ds)
|
||||
changedToReady := !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod)
|
||||
// See https://github.com/kubernetes/kubernetes/pull/38076 for more details
|
||||
@ -566,7 +590,7 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
|
||||
if len(dss) == 0 {
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("Orphan Pod %s updated.", curPod.Name)
|
||||
logger.V(4).Info("Orphan Pod updated", "pod", klog.KObj(curPod))
|
||||
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
|
||||
if labelChanged || controllerRefChanged {
|
||||
for _, ds := range dss {
|
||||
@ -575,7 +599,7 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) deletePod(obj interface{}) {
|
||||
func (dsc *DaemonSetsController) deletePod(logger klog.Logger, obj interface{}) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
// When a delete is dropped, the relist will notice a pod in the store not
|
||||
// in the list, leading to the insertion of a tombstone object which contains
|
||||
@ -608,16 +632,16 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("Pod %s deleted.", pod.Name)
|
||||
logger.V(4).Info("Pod deleted", "pod", klog.KObj(pod))
|
||||
dsc.expectations.DeletionObserved(dsKey)
|
||||
dsc.enqueueDaemonSet(ds)
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) addNode(obj interface{}) {
|
||||
func (dsc *DaemonSetsController) addNode(logger klog.Logger, obj interface{}) {
|
||||
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
|
||||
dsList, err := dsc.dsLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
klog.V(4).Infof("Error enqueueing daemon sets: %v", err)
|
||||
logger.V(4).Info("Error enqueueing daemon sets", "err", err)
|
||||
return
|
||||
}
|
||||
node := obj.(*v1.Node)
|
||||
@ -666,7 +690,7 @@ func shouldIgnoreNodeUpdate(oldNode, curNode v1.Node) bool {
|
||||
return apiequality.Semantic.DeepEqual(oldNode, curNode)
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
|
||||
func (dsc *DaemonSetsController) updateNode(logger klog.Logger, old, cur interface{}) {
|
||||
oldNode := old.(*v1.Node)
|
||||
curNode := cur.(*v1.Node)
|
||||
if shouldIgnoreNodeUpdate(*oldNode, *curNode) {
|
||||
@ -675,7 +699,7 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
|
||||
|
||||
dsList, err := dsc.dsLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
klog.V(4).Infof("Error listing daemon sets: %v", err)
|
||||
logger.V(4).Info("Error listing daemon sets", "err", err)
|
||||
return
|
||||
}
|
||||
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
|
||||
@ -733,11 +757,12 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ctx context.Context, ds *a
|
||||
}
|
||||
// Group Pods by Node name.
|
||||
nodeToDaemonPods := make(map[string][]*v1.Pod)
|
||||
logger := klog.FromContext(ctx)
|
||||
for _, pod := range claimedPods {
|
||||
nodeName, err := util.GetTargetNodeName(pod)
|
||||
if err != nil {
|
||||
klog.Warningf("Failed to get target node name of Pod %v/%v in DaemonSet %v/%v",
|
||||
pod.Namespace, pod.Name, ds.Namespace, ds.Name)
|
||||
logger.Info("Failed to get target node name of Pod in DaemonSet",
|
||||
"pod", klog.KObj(pod), "daemonset", klog.KObj(ds))
|
||||
continue
|
||||
}
|
||||
|
||||
@ -773,6 +798,7 @@ func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controll
|
||||
// - podsToDelete: the Pods need to be deleted on the node
|
||||
// - err: unexpected error
|
||||
func (dsc *DaemonSetsController) podsShouldBeOnNode(
|
||||
logger klog.Logger,
|
||||
node *v1.Node,
|
||||
nodeToDaemonPods map[string][]*v1.Pod,
|
||||
ds *apps.DaemonSet,
|
||||
@ -803,8 +829,8 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
|
||||
inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
|
||||
if inBackoff {
|
||||
delay := dsc.failedPodsBackoff.Get(backoffKey)
|
||||
klog.V(4).Infof("Deleting failed pod %s/%s on node %s has been limited by backoff - %v remaining",
|
||||
pod.Namespace, pod.Name, node.Name, delay)
|
||||
logger.V(4).Info("Deleting failed pod on node has been limited by backoff",
|
||||
"pod", klog.KObj(pod), "node", klog.KObj(node), "currentDelay", delay)
|
||||
dsc.enqueueDaemonSetAfter(ds, delay)
|
||||
continue
|
||||
}
|
||||
@ -812,7 +838,7 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
|
||||
dsc.failedPodsBackoff.Next(backoffKey, now)
|
||||
|
||||
msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name)
|
||||
klog.V(2).Infof(msg)
|
||||
logger.V(2).Info("Found failed daemon pod on node, will try to kill it", "pod", klog.KObj(pod), "node", klog.KObj(node))
|
||||
// Emit an event so that it's discoverable to users.
|
||||
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
|
||||
podsToDelete = append(podsToDelete, pod.Name)
|
||||
@ -866,10 +892,10 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
|
||||
if oldestNewPod != nil && oldestOldPod != nil {
|
||||
switch {
|
||||
case !podutil.IsPodReady(oldestOldPod):
|
||||
klog.V(5).Infof("Pod %s/%s from daemonset %s is no longer ready and will be replaced with newer pod %s", oldestOldPod.Namespace, oldestOldPod.Name, ds.Name, oldestNewPod.Name)
|
||||
logger.V(5).Info("Pod from daemonset is no longer ready and will be replaced with newer pod", "oldPod", klog.KObj(oldestOldPod), "daemonset", klog.KObj(ds), "newPod", klog.KObj(oldestNewPod))
|
||||
podsToDelete = append(podsToDelete, oldestOldPod.Name)
|
||||
case podutil.IsPodAvailable(oldestNewPod, ds.Spec.MinReadySeconds, metav1.Time{Time: dsc.failedPodsBackoff.Clock.Now()}):
|
||||
klog.V(5).Infof("Pod %s/%s from daemonset %s is now ready and will replace older pod %s", oldestNewPod.Namespace, oldestNewPod.Name, ds.Name, oldestOldPod.Name)
|
||||
logger.V(5).Info("Pod from daemonset is now ready and will replace older pod", "newPod", klog.KObj(oldestNewPod), "daemonset", klog.KObj(ds), "oldPod", klog.KObj(oldestOldPod))
|
||||
podsToDelete = append(podsToDelete, oldestOldPod.Name)
|
||||
}
|
||||
}
|
||||
@ -926,10 +952,11 @@ func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet,
|
||||
|
||||
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
|
||||
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
|
||||
logger := klog.FromContext(ctx)
|
||||
var nodesNeedingDaemonPods, podsToDelete []string
|
||||
for _, node := range nodeList {
|
||||
nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(
|
||||
node, nodeToDaemonPods, ds, hash)
|
||||
logger, node, nodeToDaemonPods, ds, hash)
|
||||
|
||||
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
|
||||
podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
|
||||
@ -951,6 +978,7 @@ func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet,
|
||||
// returns slice with errors if any
|
||||
func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
|
||||
// We need to set expectations before creating/deleting pods to avoid race conditions.
|
||||
logger := klog.FromContext(ctx)
|
||||
dsKey, err := controller.KeyFunc(ds)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
|
||||
@ -971,7 +999,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS
|
||||
// error channel to communicate back failures. make the buffer big enough to avoid any blocking
|
||||
errCh := make(chan error, createDiff+deleteDiff)
|
||||
|
||||
klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
|
||||
logger.V(4).Info("Nodes needing daemon pods for daemon set, creating", "daemonset", klog.KObj(ds), "needCount", nodesNeedingDaemonPods, "createCount", createDiff)
|
||||
createWait := sync.WaitGroup{}
|
||||
// If the returned error is not nil we have a parse error.
|
||||
// The controller handles this via the hash.
|
||||
@ -1014,7 +1042,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
|
||||
logger.V(2).Info("Failed creation, decrementing expectations for daemon set", "daemonset", klog.KObj(ds))
|
||||
dsc.expectations.CreationObserved(dsKey)
|
||||
errCh <- err
|
||||
utilruntime.HandleError(err)
|
||||
@ -1025,7 +1053,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS
|
||||
// any skipped pods that we never attempted to start shouldn't be expected.
|
||||
skippedPods := createDiff - (batchSize + pos)
|
||||
if errorCount < len(errCh) && skippedPods > 0 {
|
||||
klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
|
||||
logger.V(2).Info("Slow-start failure. Skipping creation pods, decrementing expectations for daemon set", "skippedPods", skippedPods, "daemonset", klog.KObj(ds))
|
||||
dsc.expectations.LowerExpectations(dsKey, skippedPods, 0)
|
||||
// The skipped pods will be retried later. The next controller resync will
|
||||
// retry the slow start process.
|
||||
@ -1033,7 +1061,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS
|
||||
}
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
|
||||
logger.V(4).Info("Pods to delete for daemon set, deleting", "daemonset", klog.KObj(ds), "toDeleteCount", podsToDelete, "deleteCount", deleteDiff)
|
||||
deleteWait := sync.WaitGroup{}
|
||||
deleteWait.Add(deleteDiff)
|
||||
for i := 0; i < deleteDiff; i++ {
|
||||
@ -1042,7 +1070,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS
|
||||
if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil {
|
||||
dsc.expectations.DeletionObserved(dsKey)
|
||||
if !apierrors.IsNotFound(err) {
|
||||
klog.V(2).Infof("Failed deletion, decremented expectations for set %q/%q", ds.Namespace, ds.Name)
|
||||
logger.V(2).Info("Failed deletion, decremented expectations for daemon set", "daemonset", klog.KObj(ds))
|
||||
errCh <- err
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
@ -1116,7 +1144,8 @@ func storeDaemonSetStatus(
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string, updateObservedGen bool) error {
|
||||
klog.V(4).Infof("Updating daemon set status")
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.V(4).Info("Updating daemon set status")
|
||||
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
|
||||
@ -1175,10 +1204,11 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
startTime := dsc.failedPodsBackoff.Clock.Now()
|
||||
|
||||
defer func() {
|
||||
klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, dsc.failedPodsBackoff.Clock.Now().Sub(startTime))
|
||||
logger.V(4).Info("Finished syncing daemon set", "daemonset", key, "time", dsc.failedPodsBackoff.Clock.Now().Sub(startTime))
|
||||
}()
|
||||
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
@ -1187,7 +1217,7 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string)
|
||||
}
|
||||
ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
|
||||
if apierrors.IsNotFound(err) {
|
||||
klog.V(3).Infof("daemon set has been deleted %v", key)
|
||||
logger.V(3).Info("Daemon set has been deleted", "daemonset", key)
|
||||
dsc.expectations.DeleteExpectations(key)
|
||||
return nil
|
||||
}
|
||||
@ -1244,7 +1274,7 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string)
|
||||
case err != nil && statusErr != nil:
|
||||
// If there was an error, and we failed to update status,
|
||||
// log it and return the original error.
|
||||
klog.ErrorS(statusErr, "Failed to update status", "daemonSet", klog.KObj(ds))
|
||||
klog.Error(statusErr, "Failed to update status", "daemonSet", klog.KObj(ds))
|
||||
return err
|
||||
case err != nil:
|
||||
return err
|
||||
|
@ -44,7 +44,7 @@ import (
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/apis/scheduling"
|
||||
@ -310,11 +310,12 @@ type daemonSetsController struct {
|
||||
fakeRecorder *record.FakeRecorder
|
||||
}
|
||||
|
||||
func newTestController(initialObjects ...runtime.Object) (*daemonSetsController, *fakePodControl, *fake.Clientset, error) {
|
||||
func newTestController(ctx context.Context, initialObjects ...runtime.Object) (*daemonSetsController, *fakePodControl, *fake.Clientset, error) {
|
||||
clientset := fake.NewSimpleClientset(initialObjects...)
|
||||
informerFactory := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
|
||||
|
||||
dsc, err := NewDaemonSetsController(
|
||||
ctx,
|
||||
informerFactory.Apps().V1().DaemonSets(),
|
||||
informerFactory.Apps().V1().ControllerRevisions(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
@ -417,7 +418,7 @@ func expectSyncDaemonSetsWithError(t *testing.T, manager *daemonSetsController,
|
||||
// clearExpectations copies the FakePodControl to PodStore and clears the create and delete expectations.
|
||||
func clearExpectations(t *testing.T, manager *daemonSetsController, ds *apps.DaemonSet, fakePodControl *fakePodControl) {
|
||||
fakePodControl.Clear()
|
||||
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
key, err := controller.KeyFunc(ds)
|
||||
if err != nil {
|
||||
t.Errorf("Could not get key for daemon.")
|
||||
@ -455,13 +456,14 @@ func clearExpectations(t *testing.T, manager *daemonSetsController, ds *apps.Dae
|
||||
}
|
||||
sort.Strings(lines)
|
||||
for _, line := range lines {
|
||||
klog.Info(line)
|
||||
logger.Info(line)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteFinalStateUnknown(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
manager, _, _, err := newTestController()
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -469,7 +471,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
// DeletedFinalStateUnknown should queue the embedded DS if found.
|
||||
manager.deleteDaemonset(cache.DeletedFinalStateUnknown{Key: "foo", Obj: ds})
|
||||
manager.deleteDaemonset(logger, cache.DeletedFinalStateUnknown{Key: "foo", Obj: ds})
|
||||
enqueuedKey, _ := manager.queue.Get()
|
||||
if enqueuedKey.(string) != "default/foo" {
|
||||
t.Errorf("expected delete of DeletedFinalStateUnknown to enqueue the daemonset but found: %#v", enqueuedKey)
|
||||
@ -482,8 +484,10 @@ func TestExpectationsOnRecreate(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
f := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||
dsc, err := NewDaemonSetsController(
|
||||
ctx,
|
||||
f.Apps().V1().DaemonSets(),
|
||||
f.Apps().V1().ControllerRevisions(),
|
||||
f.Core().V1().Pods(),
|
||||
@ -685,7 +689,8 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -705,7 +710,8 @@ func TestSimpleDaemonSetScheduleDaemonSetPodsLaunchesPods(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -781,7 +787,8 @@ func TestSimpleDaemonSetPodCreateErrors(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, clientset, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, clientset, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -832,7 +839,8 @@ func TestDaemonSetPodCreateExpectationsError(t *testing.T) {
|
||||
for _, strategy := range strategies {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -861,7 +869,8 @@ func TestSimpleDaemonSetUpdatesStatusAfterLaunchingPods(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, clientset, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, clientset, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -928,7 +937,8 @@ func TestSimpleDaemonSetUpdatesStatusError(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, clientset, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, clientset, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -960,7 +970,8 @@ func TestSimpleDaemonSetUpdatesStatusError(t *testing.T) {
|
||||
// DaemonSets should do nothing if there aren't any nodes
|
||||
func TestNoNodesDoesNothing(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
manager, podControl, _, err := newTestController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -981,7 +992,8 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1003,7 +1015,8 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1068,7 +1081,8 @@ func TestInsufficientCapacityNodeDaemonDoesNotUnscheduleRunningPod(t *testing.T)
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
ds.Spec.Template.Spec = podSpec
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1101,11 +1115,12 @@ func TestInsufficientCapacityNodeDaemonDoesNotUnscheduleRunningPod(t *testing.T)
|
||||
|
||||
// DaemonSets should only place onto nodes with sufficient free resource and matched node selector
|
||||
func TestInsufficientCapacityNodeSufficientCapacityWithNodeLabelDaemonLaunchPod(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
podSpec := resourcePodSpecWithoutNodeName("50M", "75m")
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.Template.Spec = podSpec
|
||||
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1137,7 +1152,8 @@ func TestNetworkUnavailableNodeDaemonLaunchesPod(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("simple")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1168,7 +1184,8 @@ func TestDontDoAnythingIfBeingDeleted(t *testing.T) {
|
||||
ds.Spec.Template.Spec = podSpec
|
||||
now := metav1.Now()
|
||||
ds.DeletionTimestamp = &now
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1199,7 +1216,8 @@ func TestDontDoAnythingIfBeingDeletedRace(t *testing.T) {
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
now := metav1.Now()
|
||||
ds.DeletionTimestamp = &now
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1238,7 +1256,8 @@ func TestPortConflictWithSameDaemonPodDoesNotDeletePod(t *testing.T) {
|
||||
}},
|
||||
}},
|
||||
}
|
||||
manager, podControl, _, err := newTestController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1285,7 +1304,8 @@ func TestNoPortConflictNodeDaemonLaunchesPod(t *testing.T) {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
ds.Spec.Template.Spec = podSpec2
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1328,7 +1348,8 @@ func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) {
|
||||
ds.Spec.Selector = &ls
|
||||
ds.Spec.Template.Spec.NodeSelector = map[string]string{"foo": "bar"}
|
||||
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1363,7 +1384,8 @@ func TestDealsWithExistingPods(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1386,7 +1408,8 @@ func TestSelectorDaemonLaunchesPods(t *testing.T) {
|
||||
daemon := newDaemonSet("foo")
|
||||
daemon.Spec.UpdateStrategy = *strategy
|
||||
daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
manager, podControl, _, err := newTestController(daemon)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, daemon)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1406,7 +1429,8 @@ func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1430,7 +1454,8 @@ func TestSelectorDaemonDealsWithExistingPods(t *testing.T) {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1455,7 +1480,8 @@ func TestSelectorDaemonDealsWithExistingPods(t *testing.T) {
|
||||
// DaemonSet with node selector which does not match any node labels should not launch pods.
|
||||
func TestBadSelectorDaemonDoesNothing(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
manager, podControl, _, err := newTestController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1478,7 +1504,8 @@ func TestNameDaemonSetLaunchesPods(t *testing.T) {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
ds.Spec.Template.Spec.NodeName = "node-0"
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1497,7 +1524,8 @@ func TestBadNameDaemonSetDoesNothing(t *testing.T) {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
ds.Spec.Template.Spec.NodeName = "node-10"
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1517,7 +1545,8 @@ func TestNameAndSelectorDaemonSetLaunchesPods(t *testing.T) {
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
ds.Spec.Template.Spec.NodeName = "node-6"
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1538,7 +1567,8 @@ func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) {
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
ds.Spec.Template.Spec.NodeName = "node-0"
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1554,9 +1584,10 @@ func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) {
|
||||
|
||||
// DaemonSet with node selector, matching some nodes, should launch pods on all the nodes.
|
||||
func TestSelectorDaemonSetLaunchesPods(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1591,8 +1622,8 @@ func TestNodeAffinityDaemonLaunchesPods(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
manager, podControl, _, err := newTestController(daemon)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, daemon)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSetsController: %v", err)
|
||||
}
|
||||
@ -1610,7 +1641,8 @@ func TestNumberReadyStatus(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, clientset, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, clientset, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1656,7 +1688,8 @@ func TestObservedGeneration(t *testing.T) {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
ds.Generation = 1
|
||||
manager, podControl, clientset, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, clientset, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1702,7 +1735,8 @@ func TestDaemonKillFailedPods(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1723,10 +1757,11 @@ func TestDaemonKillFailedPods(t *testing.T) {
|
||||
func TestDaemonKillFailedPodsBackoff(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
t.Run(string(strategy.Type), func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1794,7 +1829,8 @@ func TestNoScheduleTaintedDoesntEvicitRunningIntolerantPod(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("intolerant")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1824,7 +1860,8 @@ func TestNoExecuteTaintedDoesEvicitRunningIntolerantPod(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("intolerant")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1853,7 +1890,8 @@ func TestTaintedNodeDaemonDoesNotLaunchIntolerantPod(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("intolerant")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1879,7 +1917,8 @@ func TestTaintedNodeDaemonLaunchesToleratePod(t *testing.T) {
|
||||
ds := newDaemonSet("tolerate")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
setDaemonSetToleration(ds, noScheduleTolerations)
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1904,7 +1943,8 @@ func TestNotReadyNodeDaemonLaunchesPod(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("simple")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1932,7 +1972,8 @@ func TestUnreachableNodeDaemonLaunchesPod(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("simple")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1961,7 +2002,8 @@ func TestNodeDaemonLaunchesToleratePod(t *testing.T) {
|
||||
ds := newDaemonSet("tolerate")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
setDaemonSetToleration(ds, noScheduleTolerations)
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -1980,7 +2022,8 @@ func TestDaemonSetRespectsTermination(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -2015,7 +2058,8 @@ func TestTaintPressureNodeDaemonLaunchesPod(t *testing.T) {
|
||||
ds := newDaemonSet("critical")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
setDaemonSetCritical(ds)
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -2329,7 +2373,8 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
|
||||
node.Status.Conditions = append(node.Status.Conditions, c.nodeCondition...)
|
||||
node.Status.Allocatable = allocatableResources("100M", "1")
|
||||
node.Spec.Unschedulable = c.nodeUnschedulable
|
||||
manager, _, _, err := newTestController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -2431,7 +2476,8 @@ func TestUpdateNode(t *testing.T) {
|
||||
}
|
||||
for _, c := range cases {
|
||||
for _, strategy := range updateStrategies() {
|
||||
manager, podControl, _, err := newTestController()
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -2462,7 +2508,7 @@ func TestUpdateNode(t *testing.T) {
|
||||
}
|
||||
|
||||
enqueued = false
|
||||
manager.updateNode(c.oldNode, c.newNode)
|
||||
manager.updateNode(logger, c.oldNode, c.newNode)
|
||||
if enqueued != c.shouldEnqueue {
|
||||
t.Errorf("Test case: '%s', expected: %t, got: %t", c.test, c.shouldEnqueue, enqueued)
|
||||
}
|
||||
@ -2612,7 +2658,8 @@ func TestDeleteNoDaemonPod(t *testing.T) {
|
||||
|
||||
for _, c := range cases {
|
||||
for _, strategy := range updateStrategies() {
|
||||
manager, podControl, _, err := newTestController()
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -2639,7 +2686,7 @@ func TestDeleteNoDaemonPod(t *testing.T) {
|
||||
}
|
||||
|
||||
enqueued = false
|
||||
manager.deletePod(c.deletedPod)
|
||||
manager.deletePod(logger, c.deletedPod)
|
||||
if enqueued != c.shouldEnqueue {
|
||||
t.Errorf("Test case: '%s', expected: %t, got: %t", c.test, c.shouldEnqueue, enqueued)
|
||||
}
|
||||
@ -2651,7 +2698,8 @@ func TestDeleteUnscheduledPodForNotExistingNode(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -2695,7 +2743,8 @@ func TestGetNodesToDaemonPods(t *testing.T) {
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
ds2 := newDaemonSet("foo2")
|
||||
ds2.Spec.UpdateStrategy = *strategy
|
||||
manager, _, _, err := newTestController(ds, ds2)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager, _, _, err := newTestController(ctx, ds, ds2)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -2762,7 +2811,8 @@ func TestGetNodesToDaemonPods(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAddNode(t *testing.T) {
|
||||
manager, _, _, err := newTestController()
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -2773,14 +2823,13 @@ func TestAddNode(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
manager.addNode(node1)
|
||||
manager.addNode(logger, node1)
|
||||
if got, want := manager.queue.Len(), 0; got != want {
|
||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||
}
|
||||
|
||||
node2 := newNode("node2", simpleNodeLabel)
|
||||
manager.addNode(node2)
|
||||
manager.addNode(logger, node2)
|
||||
if got, want := manager.queue.Len(), 1; got != want {
|
||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||
}
|
||||
@ -2792,7 +2841,8 @@ func TestAddNode(t *testing.T) {
|
||||
|
||||
func TestAddPod(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
manager, _, _, err := newTestController()
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -2808,9 +2858,8 @@ func TestAddPod(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1)
|
||||
manager.addPod(pod1)
|
||||
manager.addPod(logger, pod1)
|
||||
if got, want := manager.queue.Len(), 1; got != want {
|
||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||
}
|
||||
@ -2824,7 +2873,7 @@ func TestAddPod(t *testing.T) {
|
||||
}
|
||||
|
||||
pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2)
|
||||
manager.addPod(pod2)
|
||||
manager.addPod(logger, pod2)
|
||||
if got, want := manager.queue.Len(), 1; got != want {
|
||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||
}
|
||||
@ -2841,7 +2890,8 @@ func TestAddPod(t *testing.T) {
|
||||
|
||||
func TestAddPodOrphan(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
manager, _, _, err := newTestController()
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -2867,7 +2917,7 @@ func TestAddPodOrphan(t *testing.T) {
|
||||
|
||||
// Make pod an orphan. Expect matching sets to be queued.
|
||||
pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil)
|
||||
manager.addPod(pod)
|
||||
manager.addPod(logger, pod)
|
||||
if got, want := manager.queue.Len(), 2; got != want {
|
||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||
}
|
||||
@ -2879,7 +2929,8 @@ func TestAddPodOrphan(t *testing.T) {
|
||||
|
||||
func TestUpdatePod(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
manager, _, _, err := newTestController()
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -2899,7 +2950,7 @@ func TestUpdatePod(t *testing.T) {
|
||||
pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1)
|
||||
prev := *pod1
|
||||
bumpResourceVersion(pod1)
|
||||
manager.updatePod(&prev, pod1)
|
||||
manager.updatePod(logger, &prev, pod1)
|
||||
if got, want := manager.queue.Len(), 1; got != want {
|
||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||
}
|
||||
@ -2915,7 +2966,7 @@ func TestUpdatePod(t *testing.T) {
|
||||
pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2)
|
||||
prev = *pod2
|
||||
bumpResourceVersion(pod2)
|
||||
manager.updatePod(&prev, pod2)
|
||||
manager.updatePod(logger, &prev, pod2)
|
||||
if got, want := manager.queue.Len(), 1; got != want {
|
||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||
}
|
||||
@ -2932,7 +2983,8 @@ func TestUpdatePod(t *testing.T) {
|
||||
|
||||
func TestUpdatePodOrphanSameLabels(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
manager, _, _, err := newTestController()
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -2952,7 +3004,7 @@ func TestUpdatePodOrphanSameLabels(t *testing.T) {
|
||||
pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil)
|
||||
prev := *pod
|
||||
bumpResourceVersion(pod)
|
||||
manager.updatePod(&prev, pod)
|
||||
manager.updatePod(logger, &prev, pod)
|
||||
if got, want := manager.queue.Len(), 0; got != want {
|
||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||
}
|
||||
@ -2961,7 +3013,8 @@ func TestUpdatePodOrphanSameLabels(t *testing.T) {
|
||||
|
||||
func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
manager, _, _, err := newTestController()
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -2982,7 +3035,7 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
|
||||
prev := *pod
|
||||
prev.Labels = map[string]string{"foo2": "bar2"}
|
||||
bumpResourceVersion(pod)
|
||||
manager.updatePod(&prev, pod)
|
||||
manager.updatePod(logger, &prev, pod)
|
||||
if got, want := manager.queue.Len(), 2; got != want {
|
||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||
}
|
||||
@ -2996,7 +3049,8 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = *strategy
|
||||
manager, _, _, err := newTestController()
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -3015,7 +3069,7 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
|
||||
prev := *pod
|
||||
prev.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(ds2, controllerKind)}
|
||||
bumpResourceVersion(pod)
|
||||
manager.updatePod(&prev, pod)
|
||||
manager.updatePod(logger, &prev, pod)
|
||||
if got, want := manager.queue.Len(), 2; got != want {
|
||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||
}
|
||||
@ -3024,7 +3078,8 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
|
||||
|
||||
func TestUpdatePodControllerRefRemoved(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
manager, _, _, err := newTestController()
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -3045,7 +3100,7 @@ func TestUpdatePodControllerRefRemoved(t *testing.T) {
|
||||
prev := *pod
|
||||
pod.OwnerReferences = nil
|
||||
bumpResourceVersion(pod)
|
||||
manager.updatePod(&prev, pod)
|
||||
manager.updatePod(logger, &prev, pod)
|
||||
if got, want := manager.queue.Len(), 2; got != want {
|
||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||
}
|
||||
@ -3054,7 +3109,8 @@ func TestUpdatePodControllerRefRemoved(t *testing.T) {
|
||||
|
||||
func TestDeletePod(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
manager, _, _, err := newTestController()
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -3072,7 +3128,7 @@ func TestDeletePod(t *testing.T) {
|
||||
}
|
||||
|
||||
pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1)
|
||||
manager.deletePod(pod1)
|
||||
manager.deletePod(logger, pod1)
|
||||
if got, want := manager.queue.Len(), 1; got != want {
|
||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||
}
|
||||
@ -3086,7 +3142,7 @@ func TestDeletePod(t *testing.T) {
|
||||
}
|
||||
|
||||
pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2)
|
||||
manager.deletePod(pod2)
|
||||
manager.deletePod(logger, pod2)
|
||||
if got, want := manager.queue.Len(), 1; got != want {
|
||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||
}
|
||||
@ -3103,7 +3159,8 @@ func TestDeletePod(t *testing.T) {
|
||||
|
||||
func TestDeletePodOrphan(t *testing.T) {
|
||||
for _, strategy := range updateStrategies() {
|
||||
manager, _, _, err := newTestController()
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -3128,7 +3185,7 @@ func TestDeletePodOrphan(t *testing.T) {
|
||||
}
|
||||
|
||||
pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil)
|
||||
manager.deletePod(pod)
|
||||
manager.deletePod(logger, pod)
|
||||
if got, want := manager.queue.Len(), 0; got != want {
|
||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
||||
}
|
||||
@ -3158,9 +3215,10 @@ func getQueuedKeys(queue workqueue.RateLimitingInterface) []string {
|
||||
|
||||
// Controller should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods.
|
||||
func TestSurgeDealsWithExistingPods(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -3174,9 +3232,10 @@ func TestSurgeDealsWithExistingPods(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSurgePreservesReadyOldPods(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -3213,9 +3272,10 @@ func TestSurgePreservesReadyOldPods(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSurgeCreatesNewPodWhenAtMaxSurgeAndOldPodDeleted(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -3259,9 +3319,10 @@ func TestSurgeCreatesNewPodWhenAtMaxSurgeAndOldPodDeleted(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSurgeDeletesUnreadyOldPods(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -3298,10 +3359,11 @@ func TestSurgeDeletesUnreadyOldPods(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSurgePreservesOldReadyWithUnsatisfiedMinReady(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.MinReadySeconds = 15
|
||||
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -3342,10 +3404,11 @@ func TestSurgePreservesOldReadyWithUnsatisfiedMinReady(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSurgeDeletesOldReadyWithUnsatisfiedMinReady(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.MinReadySeconds = 15
|
||||
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
|
@ -17,9 +17,5 @@ limitations under the License.
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"k8s.io/klog/v2"
|
||||
_ "k8s.io/klog/v2/ktesting/init"
|
||||
)
|
||||
|
||||
func init() {
|
||||
klog.InitFlags(nil)
|
||||
}
|
||||
|
@ -42,11 +42,12 @@ import (
|
||||
// rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes,
|
||||
// remaining within the constraints imposed by the update strategy.
|
||||
func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
|
||||
}
|
||||
maxSurge, maxUnavailable, err := dsc.updatedDesiredNodeCounts(ds, nodeList, nodeToDaemonPods)
|
||||
maxSurge, maxUnavailable, err := dsc.updatedDesiredNodeCounts(ctx, ds, nodeList, nodeToDaemonPods)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't get unavailable numbers: %v", err)
|
||||
}
|
||||
@ -73,7 +74,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
|
||||
newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
|
||||
if !ok {
|
||||
// let the manage loop clean up this node, and treat it as an unavailable node
|
||||
klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName)
|
||||
logger.V(3).Info("DaemonSet has excess pods on node, skipping to allow the core loop to process", "daemonset", klog.KObj(ds), "node", klog.KRef("", nodeName))
|
||||
numUnavailable++
|
||||
continue
|
||||
}
|
||||
@ -92,7 +93,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
|
||||
switch {
|
||||
case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
|
||||
// the old pod isn't available, so it needs to be replaced
|
||||
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName)
|
||||
logger.V(5).Info("DaemonSet pod on node is out of date and not available, allowing replacement", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
|
||||
// record the replacement
|
||||
if allowedReplacementPods == nil {
|
||||
allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods))
|
||||
@ -102,7 +103,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
|
||||
// no point considering any other candidates
|
||||
continue
|
||||
default:
|
||||
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date, this is a candidate to replace", ds.Namespace, ds.Name, oldPod.Name, nodeName)
|
||||
logger.V(5).Info("DaemonSet pod on node is out of date, this is a candidate to replace", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
|
||||
// record the candidate
|
||||
if candidatePodsToDelete == nil {
|
||||
candidatePodsToDelete = make([]string, 0, maxUnavailable)
|
||||
@ -113,7 +114,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
|
||||
}
|
||||
|
||||
// use any of the candidates we can, including the allowedReplacemnntPods
|
||||
klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, up to %d unavailable, %d new are unavailable, %d candidates", ds.Namespace, ds.Name, len(allowedReplacementPods), maxUnavailable, numUnavailable, len(candidatePodsToDelete))
|
||||
logger.V(5).Info("DaemonSet allowing replacements", "daemonset", klog.KObj(ds), "replacements", len(allowedReplacementPods), "maxUnavailable", maxUnavailable, "numUnavailable", numUnavailable, "candidates", len(candidatePodsToDelete))
|
||||
remainingUnavailable := maxUnavailable - numUnavailable
|
||||
if remainingUnavailable < 0 {
|
||||
remainingUnavailable = 0
|
||||
@ -148,7 +149,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
|
||||
newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
|
||||
if !ok {
|
||||
// let the manage loop clean up this node, and treat it as a surge node
|
||||
klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName)
|
||||
logger.V(3).Info("DaemonSet has excess pods on node, skipping to allow the core loop to process", "daemonset", klog.KObj(ds), "node", klog.KRef("", nodeName))
|
||||
numSurge++
|
||||
continue
|
||||
}
|
||||
@ -160,7 +161,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
|
||||
switch {
|
||||
case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
|
||||
// the old pod isn't available, allow it to become a replacement
|
||||
klog.V(5).Infof("Pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName)
|
||||
logger.V(5).Info("Pod on node is out of date and not available, allowing replacement", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
|
||||
// record the replacement
|
||||
if allowedNewNodes == nil {
|
||||
allowedNewNodes = make([]string, 0, len(nodeToDaemonPods))
|
||||
@ -170,7 +171,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
|
||||
// no point considering any other candidates
|
||||
continue
|
||||
default:
|
||||
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date, this is a surge candidate", ds.Namespace, ds.Name, oldPod.Name, nodeName)
|
||||
logger.V(5).Info("DaemonSet pod on node is out of date, this is a surge candidate", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
|
||||
// record the candidate
|
||||
if candidateNewNodes == nil {
|
||||
candidateNewNodes = make([]string, 0, maxSurge)
|
||||
@ -185,13 +186,13 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
|
||||
continue
|
||||
}
|
||||
// we're available, delete the old pod
|
||||
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is available, remove %s", ds.Namespace, ds.Name, newPod.Name, nodeName, oldPod.Name)
|
||||
logger.V(5).Info("DaemonSet pod on node is available, remove old pod", "daemonset", klog.KObj(ds), "newPod", klog.KObj(newPod), "node", nodeName, "oldPod", klog.KObj(oldPod))
|
||||
oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// use any of the candidates we can, including the allowedNewNodes
|
||||
klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, surge up to %d, %d are in progress, %d candidates", ds.Namespace, ds.Name, len(allowedNewNodes), maxSurge, numSurge, len(candidateNewNodes))
|
||||
logger.V(5).Info("DaemonSet allowing replacements", "daemonset", klog.KObj(ds), "replacements", len(allowedNewNodes), "maxSurge", maxSurge, "numSurge", numSurge, "candidates", len(candidateNewNodes))
|
||||
remainingSurge := maxSurge - numSurge
|
||||
if remainingSurge < 0 {
|
||||
remainingSurge = 0
|
||||
@ -484,6 +485,7 @@ func (dsc *DaemonSetsController) snapshot(ctx context.Context, ds *apps.DaemonSe
|
||||
|
||||
history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Create(ctx, history, metav1.CreateOptions{})
|
||||
if outerErr := err; errors.IsAlreadyExists(outerErr) {
|
||||
logger := klog.FromContext(ctx)
|
||||
// TODO: Is it okay to get from historyLister?
|
||||
existedHistory, getErr := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
if getErr != nil {
|
||||
@ -516,7 +518,7 @@ func (dsc *DaemonSetsController) snapshot(ctx context.Context, ds *apps.DaemonSe
|
||||
if updateErr != nil {
|
||||
return nil, updateErr
|
||||
}
|
||||
klog.V(2).Infof("Found a hash collision for DaemonSet %q - bumping collisionCount to %d to resolve it", ds.Name, *currDS.Status.CollisionCount)
|
||||
logger.V(2).Info("Found a hash collision for DaemonSet - bumping collisionCount to resolve it", "daemonset", klog.KObj(ds), "collisionCount", *currDS.Status.CollisionCount)
|
||||
return nil, outerErr
|
||||
}
|
||||
return history, err
|
||||
@ -524,8 +526,9 @@ func (dsc *DaemonSetsController) snapshot(ctx context.Context, ds *apps.DaemonSe
|
||||
|
||||
// updatedDesiredNodeCounts calculates the true number of allowed unavailable or surge pods and
|
||||
// updates the nodeToDaemonPods array to include an empty array for every node that is not scheduled.
|
||||
func (dsc *DaemonSetsController) updatedDesiredNodeCounts(ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, error) {
|
||||
func (dsc *DaemonSetsController) updatedDesiredNodeCounts(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, error) {
|
||||
var desiredNumberScheduled int
|
||||
logger := klog.FromContext(ctx)
|
||||
for i := range nodeList {
|
||||
node := nodeList[i]
|
||||
wantToRun, _ := NodeShouldRunDaemonPod(node, ds)
|
||||
@ -552,10 +555,10 @@ func (dsc *DaemonSetsController) updatedDesiredNodeCounts(ds *apps.DaemonSet, no
|
||||
// if the daemonset returned with an impossible configuration, obey the default of unavailable=1 (in the
|
||||
// event the apiserver returns 0 for both surge and unavailability)
|
||||
if desiredNumberScheduled > 0 && maxUnavailable == 0 && maxSurge == 0 {
|
||||
klog.Warningf("DaemonSet %s/%s is not configured for surge or unavailability, defaulting to accepting unavailability", ds.Namespace, ds.Name)
|
||||
logger.Info("DaemonSet is not configured for surge or unavailability, defaulting to accepting unavailability", "daemonset", klog.KObj(ds))
|
||||
maxUnavailable = 1
|
||||
}
|
||||
klog.V(5).Infof("DaemonSet %s/%s, maxSurge: %d, maxUnavailable: %d", ds.Namespace, ds.Name, maxSurge, maxUnavailable)
|
||||
logger.V(5).Info("DaemonSet with maxSurge and maxUnavailable", "daemonset", klog.KObj(ds), "maxSurge", maxSurge, "maxUnavailable", maxUnavailable)
|
||||
return maxSurge, maxUnavailable, nil
|
||||
}
|
||||
|
||||
|
@ -27,15 +27,16 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/controller/daemon/util"
|
||||
testingclock "k8s.io/utils/clock/testing"
|
||||
)
|
||||
|
||||
func TestDaemonSetUpdatesPods(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ds := newDaemonSet("foo")
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -75,8 +76,9 @@ func TestDaemonSetUpdatesPods(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDaemonSetUpdatesPodsWithMaxSurge(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ds := newDaemonSet("foo")
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -116,8 +118,9 @@ func TestDaemonSetUpdatesPodsWithMaxSurge(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDaemonSetUpdatesWhenNewPosIsNotReady(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ds := newDaemonSet("foo")
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -152,8 +155,9 @@ func TestDaemonSetUpdatesWhenNewPosIsNotReady(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDaemonSetUpdatesAllOldPodsNotReady(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ds := newDaemonSet("foo")
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -187,8 +191,9 @@ func TestDaemonSetUpdatesAllOldPodsNotReady(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDaemonSetUpdatesAllOldPodsNotReadyMaxSurge(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ds := newDaemonSet("foo")
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -284,6 +289,7 @@ func podsByNodeMatchingHash(dsc *daemonSetsController, hash string) map[string][
|
||||
|
||||
func setPodReadiness(t *testing.T, dsc *daemonSetsController, ready bool, count int, fn func(*v1.Pod) bool) {
|
||||
t.Helper()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
for _, obj := range dsc.podStore.List() {
|
||||
if count <= 0 {
|
||||
break
|
||||
@ -310,7 +316,7 @@ func setPodReadiness(t *testing.T, dsc *daemonSetsController, ready bool, count
|
||||
// TODO: workaround UpdatePodCondition calling time.Now() directly
|
||||
setCondition := podutil.GetPodReadyCondition(pod.Status)
|
||||
setCondition.LastTransitionTime.Time = dsc.failedPodsBackoff.Clock.Now()
|
||||
klog.Infof("marked pod %s ready=%t", pod.Name, ready)
|
||||
logger.Info("marked pod ready", "pod", pod.Name, "ready", ready)
|
||||
count--
|
||||
}
|
||||
if count > 0 {
|
||||
@ -329,8 +335,9 @@ func currentDSHash(dsc *daemonSetsController, ds *apps.DaemonSet) (string, error
|
||||
}
|
||||
|
||||
func TestDaemonSetUpdatesNoTemplateChanged(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ds := newDaemonSet("foo")
|
||||
manager, podControl, _, err := newTestController(ds)
|
||||
manager, podControl, _, err := newTestController(ctx, ds)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
@ -373,7 +380,7 @@ func newUpdateUnavailable(value intstr.IntOrString) apps.DaemonSetUpdateStrategy
|
||||
func TestGetUnavailableNumbers(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
Manager *daemonSetsController
|
||||
ManagerFunc func(ctx context.Context) *daemonSetsController
|
||||
ds *apps.DaemonSet
|
||||
nodeToPods map[string][]*v1.Pod
|
||||
maxSurge int
|
||||
@ -383,13 +390,13 @@ func TestGetUnavailableNumbers(t *testing.T) {
|
||||
}{
|
||||
{
|
||||
name: "No nodes",
|
||||
Manager: func() *daemonSetsController {
|
||||
manager, _, _, err := newTestController()
|
||||
ManagerFunc: func(ctx context.Context) *daemonSetsController {
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
return manager
|
||||
}(),
|
||||
},
|
||||
ds: func() *apps.DaemonSet {
|
||||
ds := newDaemonSet("x")
|
||||
ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromInt(0))
|
||||
@ -401,14 +408,14 @@ func TestGetUnavailableNumbers(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Two nodes with ready pods",
|
||||
Manager: func() *daemonSetsController {
|
||||
manager, _, _, err := newTestController()
|
||||
ManagerFunc: func(ctx context.Context) *daemonSetsController {
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
addNodes(manager.nodeStore, 0, 2, nil)
|
||||
return manager
|
||||
}(),
|
||||
},
|
||||
ds: func() *apps.DaemonSet {
|
||||
ds := newDaemonSet("x")
|
||||
ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromInt(1))
|
||||
@ -429,14 +436,14 @@ func TestGetUnavailableNumbers(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Two nodes, one node without pods",
|
||||
Manager: func() *daemonSetsController {
|
||||
manager, _, _, err := newTestController()
|
||||
ManagerFunc: func(ctx context.Context) *daemonSetsController {
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
addNodes(manager.nodeStore, 0, 2, nil)
|
||||
return manager
|
||||
}(),
|
||||
},
|
||||
ds: func() *apps.DaemonSet {
|
||||
ds := newDaemonSet("x")
|
||||
ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromInt(0))
|
||||
@ -454,14 +461,14 @@ func TestGetUnavailableNumbers(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Two nodes, one node without pods, surge",
|
||||
Manager: func() *daemonSetsController {
|
||||
manager, _, _, err := newTestController()
|
||||
ManagerFunc: func(ctx context.Context) *daemonSetsController {
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
addNodes(manager.nodeStore, 0, 2, nil)
|
||||
return manager
|
||||
}(),
|
||||
},
|
||||
ds: func() *apps.DaemonSet {
|
||||
ds := newDaemonSet("x")
|
||||
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(0))
|
||||
@ -479,14 +486,14 @@ func TestGetUnavailableNumbers(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Two nodes with pods, MaxUnavailable in percents",
|
||||
Manager: func() *daemonSetsController {
|
||||
manager, _, _, err := newTestController()
|
||||
ManagerFunc: func(ctx context.Context) *daemonSetsController {
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
addNodes(manager.nodeStore, 0, 2, nil)
|
||||
return manager
|
||||
}(),
|
||||
},
|
||||
ds: func() *apps.DaemonSet {
|
||||
ds := newDaemonSet("x")
|
||||
ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromString("50%"))
|
||||
@ -507,14 +514,14 @@ func TestGetUnavailableNumbers(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Two nodes with pods, MaxUnavailable in percents, surge",
|
||||
Manager: func() *daemonSetsController {
|
||||
manager, _, _, err := newTestController()
|
||||
ManagerFunc: func(ctx context.Context) *daemonSetsController {
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
addNodes(manager.nodeStore, 0, 2, nil)
|
||||
return manager
|
||||
}(),
|
||||
},
|
||||
ds: func() *apps.DaemonSet {
|
||||
ds := newDaemonSet("x")
|
||||
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromString("50%"))
|
||||
@ -536,14 +543,14 @@ func TestGetUnavailableNumbers(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Two nodes with pods, MaxUnavailable is 100%, surge",
|
||||
Manager: func() *daemonSetsController {
|
||||
manager, _, _, err := newTestController()
|
||||
ManagerFunc: func(ctx context.Context) *daemonSetsController {
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
addNodes(manager.nodeStore, 0, 2, nil)
|
||||
return manager
|
||||
}(),
|
||||
},
|
||||
ds: func() *apps.DaemonSet {
|
||||
ds := newDaemonSet("x")
|
||||
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromString("100%"))
|
||||
@ -565,14 +572,14 @@ func TestGetUnavailableNumbers(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "Two nodes with pods, MaxUnavailable in percents, pod terminating",
|
||||
Manager: func() *daemonSetsController {
|
||||
manager, _, _, err := newTestController()
|
||||
ManagerFunc: func(ctx context.Context) *daemonSetsController {
|
||||
manager, _, _, err := newTestController(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
addNodes(manager.nodeStore, 0, 3, nil)
|
||||
return manager
|
||||
}(),
|
||||
},
|
||||
ds: func() *apps.DaemonSet {
|
||||
ds := newDaemonSet("x")
|
||||
ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromString("50%"))
|
||||
@ -597,12 +604,14 @@ func TestGetUnavailableNumbers(t *testing.T) {
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
c.Manager.dsStore.Add(c.ds)
|
||||
nodeList, err := c.Manager.nodeLister.List(labels.Everything())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager := c.ManagerFunc(ctx)
|
||||
manager.dsStore.Add(c.ds)
|
||||
nodeList, err := manager.nodeLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
t.Fatalf("error listing nodes: %v", err)
|
||||
}
|
||||
maxSurge, maxUnavailable, err := c.Manager.updatedDesiredNodeCounts(c.ds, nodeList, c.nodeToPods)
|
||||
maxSurge, maxUnavailable, err := manager.updatedDesiredNodeCounts(ctx, c.ds, nodeList, c.nodeToPods)
|
||||
if err != nil && c.Err != nil {
|
||||
if c.Err != err {
|
||||
t.Fatalf("Expected error: %v but got: %v", c.Err, err)
|
||||
@ -635,42 +644,44 @@ func TestControlledHistories(t *testing.T) {
|
||||
orphanCrNotInSameNsWithDs1 := newControllerRevision(ds1.GetName()+"-x3", ds1.GetNamespace()+"-other", ds1.Spec.Template.Labels, nil)
|
||||
cases := []struct {
|
||||
name string
|
||||
manager *daemonSetsController
|
||||
managerFunc func(ctx context.Context) *daemonSetsController
|
||||
historyCRAll []*apps.ControllerRevision
|
||||
expectControllerRevisions []*apps.ControllerRevision
|
||||
}{
|
||||
{
|
||||
name: "controller revision in the same namespace",
|
||||
manager: func() *daemonSetsController {
|
||||
manager, _, _, err := newTestController(ds1, crOfDs1, orphanCrInSameNsWithDs1)
|
||||
managerFunc: func(ctx context.Context) *daemonSetsController {
|
||||
manager, _, _, err := newTestController(ctx, ds1, crOfDs1, orphanCrInSameNsWithDs1)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
manager.dsStore.Add(ds1)
|
||||
return manager
|
||||
}(),
|
||||
},
|
||||
historyCRAll: []*apps.ControllerRevision{crOfDs1, orphanCrInSameNsWithDs1},
|
||||
expectControllerRevisions: []*apps.ControllerRevision{crOfDs1, orphanCrInSameNsWithDs1},
|
||||
},
|
||||
{
|
||||
name: "Skip adopting the controller revision in namespace other than the one in which DS lives",
|
||||
manager: func() *daemonSetsController {
|
||||
manager, _, _, err := newTestController(ds1, orphanCrNotInSameNsWithDs1)
|
||||
managerFunc: func(ctx context.Context) *daemonSetsController {
|
||||
manager, _, _, err := newTestController(ctx, ds1, orphanCrNotInSameNsWithDs1)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
manager.dsStore.Add(ds1)
|
||||
return manager
|
||||
}(),
|
||||
},
|
||||
historyCRAll: []*apps.ControllerRevision{orphanCrNotInSameNsWithDs1},
|
||||
expectControllerRevisions: []*apps.ControllerRevision{},
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
manager := c.managerFunc(ctx)
|
||||
for _, h := range c.historyCRAll {
|
||||
c.manager.historyStore.Add(h)
|
||||
manager.historyStore.Add(h)
|
||||
}
|
||||
crList, err := c.manager.controlledHistories(context.TODO(), ds1)
|
||||
crList, err := manager.controlledHistories(context.TODO(), ds1)
|
||||
if err != nil {
|
||||
t.Fatalf("Test case: %s. Unexpected error: %v", c.name, err)
|
||||
}
|
||||
|
@ -39,6 +39,7 @@ import (
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
@ -75,8 +76,11 @@ func setupWithServerSetup(t *testing.T, serverSetup framework.TestServerSetup) (
|
||||
clientSet, config, closeFn := framework.StartTestServer(t, serverSetup)
|
||||
|
||||
resyncPeriod := 12 * time.Hour
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "daemonset-informers")), resyncPeriod)
|
||||
dc, err := daemon.NewDaemonSetsController(
|
||||
ctx,
|
||||
informers.Apps().V1().DaemonSets(),
|
||||
informers.Apps().V1().ControllerRevisions(),
|
||||
informers.Core().V1().Pods(),
|
||||
@ -88,8 +92,6 @@ func setupWithServerSetup(t *testing.T, serverSetup framework.TestServerSetup) (
|
||||
t.Fatalf("error creating DaemonSets controller: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
|
||||
Interface: clientSet.EventsV1(),
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user