daemonset: use contextual logging

This commit is contained in:
ZhangKe10140699 2022-11-04 16:37:32 +08:00
parent b740a34302
commit 7198bcffcd
7 changed files with 326 additions and 219 deletions

View File

@ -34,7 +34,9 @@ import (
)
func startDaemonSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "daemonset-controller"))
dsc, err := daemon.NewDaemonSetsController(
ctx,
controllerContext.InformerFactory.Apps().V1().DaemonSets(),
controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
controllerContext.InformerFactory.Core().V1().Pods(),

View File

@ -130,6 +130,7 @@ type DaemonSetsController struct {
// NewDaemonSetsController creates a new DaemonSetsController
func NewDaemonSetsController(
ctx context.Context,
daemonSetInformer appsinformers.DaemonSetInformer,
historyInformer appsinformers.ControllerRevisionInformer,
podInformer coreinformers.PodInformer,
@ -138,7 +139,7 @@ func NewDaemonSetsController(
failedPodsBackoff *flowcontrol.Backoff,
) (*DaemonSetsController, error) {
eventBroadcaster := record.NewBroadcaster()
logger := klog.FromContext(ctx)
dsc := &DaemonSetsController{
kubeClient: kubeClient,
eventBroadcaster: eventBroadcaster,
@ -156,17 +157,29 @@ func NewDaemonSetsController(
}
daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addDaemonset,
UpdateFunc: dsc.updateDaemonset,
DeleteFunc: dsc.deleteDaemonset,
AddFunc: func(obj interface{}) {
dsc.addDaemonset(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dsc.updateDaemonset(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
dsc.deleteDaemonset(logger, obj)
},
})
dsc.dsLister = daemonSetInformer.Lister()
dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced
historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addHistory,
UpdateFunc: dsc.updateHistory,
DeleteFunc: dsc.deleteHistory,
AddFunc: func(obj interface{}) {
dsc.addHistory(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dsc.updateHistory(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
dsc.deleteHistory(logger, obj)
},
})
dsc.historyLister = historyInformer.Lister()
dsc.historyStoreSynced = historyInformer.Informer().HasSynced
@ -174,16 +187,26 @@ func NewDaemonSetsController(
// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
// more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addPod,
UpdateFunc: dsc.updatePod,
DeleteFunc: dsc.deletePod,
AddFunc: func(obj interface{}) {
dsc.addPod(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dsc.updatePod(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
dsc.deletePod(logger, obj)
},
})
dsc.podLister = podInformer.Lister()
dsc.podStoreSynced = podInformer.Informer().HasSynced
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addNode,
UpdateFunc: dsc.updateNode,
AddFunc: func(obj interface{}) {
dsc.addNode(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dsc.updateNode(logger, oldObj, newObj)
},
},
)
dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
@ -197,13 +220,13 @@ func NewDaemonSetsController(
return dsc, nil
}
func (dsc *DaemonSetsController) addDaemonset(obj interface{}) {
func (dsc *DaemonSetsController) addDaemonset(logger klog.Logger, obj interface{}) {
ds := obj.(*apps.DaemonSet)
klog.V(4).Infof("Adding daemon set %s", ds.Name)
logger.V(4).Info("Adding daemon set", "daemonset", klog.KObj(ds))
dsc.enqueueDaemonSet(ds)
}
func (dsc *DaemonSetsController) updateDaemonset(cur, old interface{}) {
func (dsc *DaemonSetsController) updateDaemonset(logger klog.Logger, cur, old interface{}) {
oldDS := old.(*apps.DaemonSet)
curDS := cur.(*apps.DaemonSet)
@ -214,17 +237,17 @@ func (dsc *DaemonSetsController) updateDaemonset(cur, old interface{}) {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldDS, err))
return
}
dsc.deleteDaemonset(cache.DeletedFinalStateUnknown{
dsc.deleteDaemonset(logger, cache.DeletedFinalStateUnknown{
Key: key,
Obj: oldDS,
})
}
klog.V(4).Infof("Updating daemon set %s", oldDS.Name)
logger.V(4).Info("Updating daemon set", "daemonset", klog.KObj(oldDS))
dsc.enqueueDaemonSet(curDS)
}
func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
func (dsc *DaemonSetsController) deleteDaemonset(logger klog.Logger, obj interface{}) {
ds, ok := obj.(*apps.DaemonSet)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
@ -238,7 +261,7 @@ func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
return
}
}
klog.V(4).Infof("Deleting daemon set %s", ds.Name)
logger.V(4).Info("Deleting daemon set", "daemonset", klog.KObj(ds))
key, err := controller.KeyFunc(ds)
if err != nil {
@ -262,8 +285,9 @@ func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
defer dsc.queue.ShutDown()
klog.Infof("Starting daemon sets controller")
defer klog.Infof("Shutting down daemon sets controller")
logger := klog.FromContext(ctx)
logger.Info("Starting daemon sets controller")
defer logger.Info("Shutting down daemon sets controller")
if !cache.WaitForNamedCacheSync("daemon sets", ctx.Done(), dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
return
@ -341,7 +365,7 @@ func (dsc *DaemonSetsController) getDaemonSetsForPod(pod *v1.Pod) []*apps.Daemon
// getDaemonSetsForHistory returns a list of DaemonSets that potentially
// match a ControllerRevision.
func (dsc *DaemonSetsController) getDaemonSetsForHistory(history *apps.ControllerRevision) []*apps.DaemonSet {
func (dsc *DaemonSetsController) getDaemonSetsForHistory(logger klog.Logger, history *apps.ControllerRevision) []*apps.DaemonSet {
daemonSets, err := dsc.dsLister.GetHistoryDaemonSets(history)
if err != nil || len(daemonSets) == 0 {
return nil
@ -349,20 +373,20 @@ func (dsc *DaemonSetsController) getDaemonSetsForHistory(history *apps.Controlle
if len(daemonSets) > 1 {
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
klog.V(4).Infof("User error! more than one DaemonSets is selecting ControllerRevision %s/%s with labels: %#v",
history.Namespace, history.Name, history.Labels)
logger.V(4).Info("Found more than one DaemonSet selecting the ControllerRevision. This is potentially a user error",
"controllerRevision", klog.KObj(history), "labels", history.Labels)
}
return daemonSets
}
// addHistory enqueues the DaemonSet that manages a ControllerRevision when the ControllerRevision is created
// or when the controller manager is restarted.
func (dsc *DaemonSetsController) addHistory(obj interface{}) {
func (dsc *DaemonSetsController) addHistory(logger klog.Logger, obj interface{}) {
history := obj.(*apps.ControllerRevision)
if history.DeletionTimestamp != nil {
// On a restart of the controller manager, it's possible for an object to
// show up in a state that is already pending deletion.
dsc.deleteHistory(history)
dsc.deleteHistory(logger, history)
return
}
@ -372,17 +396,17 @@ func (dsc *DaemonSetsController) addHistory(obj interface{}) {
if ds == nil {
return
}
klog.V(4).Infof("ControllerRevision %s added.", history.Name)
logger.V(4).Info("Observed a ControllerRevision", "controllerRevision", klog.KObj(history))
return
}
// Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync
// them to see if anyone wants to adopt it.
daemonSets := dsc.getDaemonSetsForHistory(history)
daemonSets := dsc.getDaemonSetsForHistory(logger, history)
if len(daemonSets) == 0 {
return
}
klog.V(4).Infof("Orphan ControllerRevision %s added.", history.Name)
logger.V(4).Info("Orphan ControllerRevision added", "controllerRevision", klog.KObj(history))
for _, ds := range daemonSets {
dsc.enqueueDaemonSet(ds)
}
@ -391,7 +415,7 @@ func (dsc *DaemonSetsController) addHistory(obj interface{}) {
// updateHistory figures out what DaemonSet(s) manage a ControllerRevision when the ControllerRevision
// is updated and wake them up. If anything of the ControllerRevision has changed, we need to awaken
// both the old and new DaemonSets.
func (dsc *DaemonSetsController) updateHistory(old, cur interface{}) {
func (dsc *DaemonSetsController) updateHistory(logger klog.Logger, old, cur interface{}) {
curHistory := cur.(*apps.ControllerRevision)
oldHistory := old.(*apps.ControllerRevision)
if curHistory.ResourceVersion == oldHistory.ResourceVersion {
@ -415,7 +439,7 @@ func (dsc *DaemonSetsController) updateHistory(old, cur interface{}) {
if ds == nil {
return
}
klog.V(4).Infof("ControllerRevision %s updated.", curHistory.Name)
logger.V(4).Info("Observed an update to a ControllerRevision", "controllerRevision", klog.KObj(curHistory))
dsc.enqueueDaemonSet(ds)
return
}
@ -424,11 +448,11 @@ func (dsc *DaemonSetsController) updateHistory(old, cur interface{}) {
// to see if anyone wants to adopt it now.
labelChanged := !reflect.DeepEqual(curHistory.Labels, oldHistory.Labels)
if labelChanged || controllerRefChanged {
daemonSets := dsc.getDaemonSetsForHistory(curHistory)
daemonSets := dsc.getDaemonSetsForHistory(logger, curHistory)
if len(daemonSets) == 0 {
return
}
klog.V(4).Infof("Orphan ControllerRevision %s updated.", curHistory.Name)
logger.V(4).Info("Orphan ControllerRevision updated", "controllerRevision", klog.KObj(curHistory))
for _, ds := range daemonSets {
dsc.enqueueDaemonSet(ds)
}
@ -438,7 +462,7 @@ func (dsc *DaemonSetsController) updateHistory(old, cur interface{}) {
// deleteHistory enqueues the DaemonSet that manages a ControllerRevision when
// the ControllerRevision is deleted. obj could be an *app.ControllerRevision, or
// a DeletionFinalStateUnknown marker item.
func (dsc *DaemonSetsController) deleteHistory(obj interface{}) {
func (dsc *DaemonSetsController) deleteHistory(logger klog.Logger, obj interface{}) {
history, ok := obj.(*apps.ControllerRevision)
// When a delete is dropped, the relist will notice a ControllerRevision in the store not
@ -467,17 +491,17 @@ func (dsc *DaemonSetsController) deleteHistory(obj interface{}) {
if ds == nil {
return
}
klog.V(4).Infof("ControllerRevision %s deleted.", history.Name)
logger.V(4).Info("ControllerRevision deleted", "controllerRevision", klog.KObj(history))
dsc.enqueueDaemonSet(ds)
}
func (dsc *DaemonSetsController) addPod(obj interface{}) {
func (dsc *DaemonSetsController) addPod(logger klog.Logger, obj interface{}) {
pod := obj.(*v1.Pod)
if pod.DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
dsc.deletePod(pod)
dsc.deletePod(logger, pod)
return
}
@ -491,7 +515,7 @@ func (dsc *DaemonSetsController) addPod(obj interface{}) {
if err != nil {
return
}
klog.V(4).Infof("Pod %s added.", pod.Name)
logger.V(4).Info("Pod added", "pod", klog.KObj(pod))
dsc.expectations.CreationObserved(dsKey)
dsc.enqueueDaemonSet(ds)
return
@ -505,7 +529,7 @@ func (dsc *DaemonSetsController) addPod(obj interface{}) {
if len(dss) == 0 {
return
}
klog.V(4).Infof("Orphan Pod %s added.", pod.Name)
logger.V(4).Info("Orphan Pod added", "pod", klog.KObj(pod))
for _, ds := range dss {
dsc.enqueueDaemonSet(ds)
}
@ -514,7 +538,7 @@ func (dsc *DaemonSetsController) addPod(obj interface{}) {
// When a pod is updated, figure out what sets manage it and wake them
// up. If the labels of the pod have changed we need to awaken both the old
// and new set. old and cur must be *v1.Pod types.
func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
func (dsc *DaemonSetsController) updatePod(logger klog.Logger, old, cur interface{}) {
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)
if curPod.ResourceVersion == oldPod.ResourceVersion {
@ -528,7 +552,7 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an ds to create more replicas asap, not wait
// until the kubelet actually deletes the pod.
dsc.deletePod(curPod)
dsc.deletePod(logger, curPod)
return
}
@ -548,7 +572,7 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
if ds == nil {
return
}
klog.V(4).Infof("Pod %s updated.", curPod.Name)
logger.V(4).Info("Pod updated", "pod", klog.KObj(curPod))
dsc.enqueueDaemonSet(ds)
changedToReady := !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod)
// See https://github.com/kubernetes/kubernetes/pull/38076 for more details
@ -566,7 +590,7 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
if len(dss) == 0 {
return
}
klog.V(4).Infof("Orphan Pod %s updated.", curPod.Name)
logger.V(4).Info("Orphan Pod updated", "pod", klog.KObj(curPod))
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if labelChanged || controllerRefChanged {
for _, ds := range dss {
@ -575,7 +599,7 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
}
}
func (dsc *DaemonSetsController) deletePod(obj interface{}) {
func (dsc *DaemonSetsController) deletePod(logger klog.Logger, obj interface{}) {
pod, ok := obj.(*v1.Pod)
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
@ -608,16 +632,16 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) {
if err != nil {
return
}
klog.V(4).Infof("Pod %s deleted.", pod.Name)
logger.V(4).Info("Pod deleted", "pod", klog.KObj(pod))
dsc.expectations.DeletionObserved(dsKey)
dsc.enqueueDaemonSet(ds)
}
func (dsc *DaemonSetsController) addNode(obj interface{}) {
func (dsc *DaemonSetsController) addNode(logger klog.Logger, obj interface{}) {
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
dsList, err := dsc.dsLister.List(labels.Everything())
if err != nil {
klog.V(4).Infof("Error enqueueing daemon sets: %v", err)
logger.V(4).Info("Error enqueueing daemon sets", "err", err)
return
}
node := obj.(*v1.Node)
@ -666,7 +690,7 @@ func shouldIgnoreNodeUpdate(oldNode, curNode v1.Node) bool {
return apiequality.Semantic.DeepEqual(oldNode, curNode)
}
func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
func (dsc *DaemonSetsController) updateNode(logger klog.Logger, old, cur interface{}) {
oldNode := old.(*v1.Node)
curNode := cur.(*v1.Node)
if shouldIgnoreNodeUpdate(*oldNode, *curNode) {
@ -675,7 +699,7 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
dsList, err := dsc.dsLister.List(labels.Everything())
if err != nil {
klog.V(4).Infof("Error listing daemon sets: %v", err)
logger.V(4).Info("Error listing daemon sets", "err", err)
return
}
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
@ -733,11 +757,12 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ctx context.Context, ds *a
}
// Group Pods by Node name.
nodeToDaemonPods := make(map[string][]*v1.Pod)
logger := klog.FromContext(ctx)
for _, pod := range claimedPods {
nodeName, err := util.GetTargetNodeName(pod)
if err != nil {
klog.Warningf("Failed to get target node name of Pod %v/%v in DaemonSet %v/%v",
pod.Namespace, pod.Name, ds.Namespace, ds.Name)
logger.Info("Failed to get target node name of Pod in DaemonSet",
"pod", klog.KObj(pod), "daemonset", klog.KObj(ds))
continue
}
@ -773,6 +798,7 @@ func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controll
// - podsToDelete: the Pods need to be deleted on the node
// - err: unexpected error
func (dsc *DaemonSetsController) podsShouldBeOnNode(
logger klog.Logger,
node *v1.Node,
nodeToDaemonPods map[string][]*v1.Pod,
ds *apps.DaemonSet,
@ -803,8 +829,8 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
if inBackoff {
delay := dsc.failedPodsBackoff.Get(backoffKey)
klog.V(4).Infof("Deleting failed pod %s/%s on node %s has been limited by backoff - %v remaining",
pod.Namespace, pod.Name, node.Name, delay)
logger.V(4).Info("Deleting failed pod on node has been limited by backoff",
"pod", klog.KObj(pod), "node", klog.KObj(node), "currentDelay", delay)
dsc.enqueueDaemonSetAfter(ds, delay)
continue
}
@ -812,7 +838,7 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
dsc.failedPodsBackoff.Next(backoffKey, now)
msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name)
klog.V(2).Infof(msg)
logger.V(2).Info("Found failed daemon pod on node, will try to kill it", "pod", klog.KObj(pod), "node", klog.KObj(node))
// Emit an event so that it's discoverable to users.
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
podsToDelete = append(podsToDelete, pod.Name)
@ -866,10 +892,10 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
if oldestNewPod != nil && oldestOldPod != nil {
switch {
case !podutil.IsPodReady(oldestOldPod):
klog.V(5).Infof("Pod %s/%s from daemonset %s is no longer ready and will be replaced with newer pod %s", oldestOldPod.Namespace, oldestOldPod.Name, ds.Name, oldestNewPod.Name)
logger.V(5).Info("Pod from daemonset is no longer ready and will be replaced with newer pod", "oldPod", klog.KObj(oldestOldPod), "daemonset", klog.KObj(ds), "newPod", klog.KObj(oldestNewPod))
podsToDelete = append(podsToDelete, oldestOldPod.Name)
case podutil.IsPodAvailable(oldestNewPod, ds.Spec.MinReadySeconds, metav1.Time{Time: dsc.failedPodsBackoff.Clock.Now()}):
klog.V(5).Infof("Pod %s/%s from daemonset %s is now ready and will replace older pod %s", oldestNewPod.Namespace, oldestNewPod.Name, ds.Name, oldestOldPod.Name)
logger.V(5).Info("Pod from daemonset is now ready and will replace older pod", "newPod", klog.KObj(oldestNewPod), "daemonset", klog.KObj(ds), "oldPod", klog.KObj(oldestOldPod))
podsToDelete = append(podsToDelete, oldestOldPod.Name)
}
}
@ -926,10 +952,11 @@ func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet,
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
logger := klog.FromContext(ctx)
var nodesNeedingDaemonPods, podsToDelete []string
for _, node := range nodeList {
nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(
node, nodeToDaemonPods, ds, hash)
logger, node, nodeToDaemonPods, ds, hash)
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
@ -951,6 +978,7 @@ func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet,
// returns slice with errors if any
func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
// We need to set expectations before creating/deleting pods to avoid race conditions.
logger := klog.FromContext(ctx)
dsKey, err := controller.KeyFunc(ds)
if err != nil {
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
@ -971,7 +999,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS
// error channel to communicate back failures. make the buffer big enough to avoid any blocking
errCh := make(chan error, createDiff+deleteDiff)
klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
logger.V(4).Info("Nodes needing daemon pods for daemon set, creating", "daemonset", klog.KObj(ds), "needCount", nodesNeedingDaemonPods, "createCount", createDiff)
createWait := sync.WaitGroup{}
// If the returned error is not nil we have a parse error.
// The controller handles this via the hash.
@ -1014,7 +1042,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS
}
}
if err != nil {
klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
logger.V(2).Info("Failed creation, decrementing expectations for daemon set", "daemonset", klog.KObj(ds))
dsc.expectations.CreationObserved(dsKey)
errCh <- err
utilruntime.HandleError(err)
@ -1025,7 +1053,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS
// any skipped pods that we never attempted to start shouldn't be expected.
skippedPods := createDiff - (batchSize + pos)
if errorCount < len(errCh) && skippedPods > 0 {
klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
logger.V(2).Info("Slow-start failure. Skipping creation pods, decrementing expectations for daemon set", "skippedPods", skippedPods, "daemonset", klog.KObj(ds))
dsc.expectations.LowerExpectations(dsKey, skippedPods, 0)
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
@ -1033,7 +1061,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS
}
}
klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
logger.V(4).Info("Pods to delete for daemon set, deleting", "daemonset", klog.KObj(ds), "toDeleteCount", podsToDelete, "deleteCount", deleteDiff)
deleteWait := sync.WaitGroup{}
deleteWait.Add(deleteDiff)
for i := 0; i < deleteDiff; i++ {
@ -1042,7 +1070,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS
if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil {
dsc.expectations.DeletionObserved(dsKey)
if !apierrors.IsNotFound(err) {
klog.V(2).Infof("Failed deletion, decremented expectations for set %q/%q", ds.Namespace, ds.Name)
logger.V(2).Info("Failed deletion, decremented expectations for daemon set", "daemonset", klog.KObj(ds))
errCh <- err
utilruntime.HandleError(err)
}
@ -1116,7 +1144,8 @@ func storeDaemonSetStatus(
}
func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string, updateObservedGen bool) error {
klog.V(4).Infof("Updating daemon set status")
logger := klog.FromContext(ctx)
logger.V(4).Info("Updating daemon set status")
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
@ -1175,10 +1204,11 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *
}
func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) error {
logger := klog.FromContext(ctx)
startTime := dsc.failedPodsBackoff.Clock.Now()
defer func() {
klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, dsc.failedPodsBackoff.Clock.Now().Sub(startTime))
logger.V(4).Info("Finished syncing daemon set", "daemonset", key, "time", dsc.failedPodsBackoff.Clock.Now().Sub(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
@ -1187,7 +1217,7 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string)
}
ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
if apierrors.IsNotFound(err) {
klog.V(3).Infof("daemon set has been deleted %v", key)
logger.V(3).Info("Daemon set has been deleted", "daemonset", key)
dsc.expectations.DeleteExpectations(key)
return nil
}
@ -1244,7 +1274,7 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string)
case err != nil && statusErr != nil:
// If there was an error, and we failed to update status,
// log it and return the original error.
klog.ErrorS(statusErr, "Failed to update status", "daemonSet", klog.KObj(ds))
klog.Error(statusErr, "Failed to update status", "daemonSet", klog.KObj(ds))
return err
case err != nil:
return err

View File

@ -44,7 +44,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/scheduling"
@ -310,11 +310,12 @@ type daemonSetsController struct {
fakeRecorder *record.FakeRecorder
}
func newTestController(initialObjects ...runtime.Object) (*daemonSetsController, *fakePodControl, *fake.Clientset, error) {
func newTestController(ctx context.Context, initialObjects ...runtime.Object) (*daemonSetsController, *fakePodControl, *fake.Clientset, error) {
clientset := fake.NewSimpleClientset(initialObjects...)
informerFactory := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
dsc, err := NewDaemonSetsController(
ctx,
informerFactory.Apps().V1().DaemonSets(),
informerFactory.Apps().V1().ControllerRevisions(),
informerFactory.Core().V1().Pods(),
@ -417,7 +418,7 @@ func expectSyncDaemonSetsWithError(t *testing.T, manager *daemonSetsController,
// clearExpectations copies the FakePodControl to PodStore and clears the create and delete expectations.
func clearExpectations(t *testing.T, manager *daemonSetsController, ds *apps.DaemonSet, fakePodControl *fakePodControl) {
fakePodControl.Clear()
logger, _ := ktesting.NewTestContext(t)
key, err := controller.KeyFunc(ds)
if err != nil {
t.Errorf("Could not get key for daemon.")
@ -455,13 +456,14 @@ func clearExpectations(t *testing.T, manager *daemonSetsController, ds *apps.Dae
}
sort.Strings(lines)
for _, line := range lines {
klog.Info(line)
logger.Info(line)
}
}
func TestDeleteFinalStateUnknown(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _, err := newTestController()
logger, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -469,7 +471,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
// DeletedFinalStateUnknown should queue the embedded DS if found.
manager.deleteDaemonset(cache.DeletedFinalStateUnknown{Key: "foo", Obj: ds})
manager.deleteDaemonset(logger, cache.DeletedFinalStateUnknown{Key: "foo", Obj: ds})
enqueuedKey, _ := manager.queue.Get()
if enqueuedKey.(string) != "default/foo" {
t.Errorf("expected delete of DeletedFinalStateUnknown to enqueue the daemonset but found: %#v", enqueuedKey)
@ -482,8 +484,10 @@ func TestExpectationsOnRecreate(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
_, ctx := ktesting.NewTestContext(t)
f := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
dsc, err := NewDaemonSetsController(
ctx,
f.Apps().V1().DaemonSets(),
f.Apps().V1().ControllerRevisions(),
f.Core().V1().Pods(),
@ -685,7 +689,8 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -705,7 +710,8 @@ func TestSimpleDaemonSetScheduleDaemonSetPodsLaunchesPods(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -781,7 +787,8 @@ func TestSimpleDaemonSetPodCreateErrors(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, clientset, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, clientset, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -832,7 +839,8 @@ func TestDaemonSetPodCreateExpectationsError(t *testing.T) {
for _, strategy := range strategies {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -861,7 +869,8 @@ func TestSimpleDaemonSetUpdatesStatusAfterLaunchingPods(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, clientset, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, clientset, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -928,7 +937,8 @@ func TestSimpleDaemonSetUpdatesStatusError(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, clientset, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, clientset, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -960,7 +970,8 @@ func TestSimpleDaemonSetUpdatesStatusError(t *testing.T) {
// DaemonSets should do nothing if there aren't any nodes
func TestNoNodesDoesNothing(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, podControl, _, err := newTestController()
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -981,7 +992,8 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1003,7 +1015,8 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1068,7 +1081,8 @@ func TestInsufficientCapacityNodeDaemonDoesNotUnscheduleRunningPod(t *testing.T)
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec = podSpec
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1101,11 +1115,12 @@ func TestInsufficientCapacityNodeDaemonDoesNotUnscheduleRunningPod(t *testing.T)
// DaemonSets should only place onto nodes with sufficient free resource and matched node selector
func TestInsufficientCapacityNodeSufficientCapacityWithNodeLabelDaemonLaunchPod(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
podSpec := resourcePodSpecWithoutNodeName("50M", "75m")
ds := newDaemonSet("foo")
ds.Spec.Template.Spec = podSpec
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
manager, podControl, _, err := newTestController(ds)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1137,7 +1152,8 @@ func TestNetworkUnavailableNodeDaemonLaunchesPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("simple")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1168,7 +1184,8 @@ func TestDontDoAnythingIfBeingDeleted(t *testing.T) {
ds.Spec.Template.Spec = podSpec
now := metav1.Now()
ds.DeletionTimestamp = &now
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1199,7 +1216,8 @@ func TestDontDoAnythingIfBeingDeletedRace(t *testing.T) {
ds.Spec.UpdateStrategy = *strategy
now := metav1.Now()
ds.DeletionTimestamp = &now
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1238,7 +1256,8 @@ func TestPortConflictWithSameDaemonPodDoesNotDeletePod(t *testing.T) {
}},
}},
}
manager, podControl, _, err := newTestController()
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1285,7 +1304,8 @@ func TestNoPortConflictNodeDaemonLaunchesPod(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec = podSpec2
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1328,7 +1348,8 @@ func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) {
ds.Spec.Selector = &ls
ds.Spec.Template.Spec.NodeSelector = map[string]string{"foo": "bar"}
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1363,7 +1384,8 @@ func TestDealsWithExistingPods(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1386,7 +1408,8 @@ func TestSelectorDaemonLaunchesPods(t *testing.T) {
daemon := newDaemonSet("foo")
daemon.Spec.UpdateStrategy = *strategy
daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
manager, podControl, _, err := newTestController(daemon)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, daemon)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1406,7 +1429,8 @@ func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1430,7 +1454,8 @@ func TestSelectorDaemonDealsWithExistingPods(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1455,7 +1480,8 @@ func TestSelectorDaemonDealsWithExistingPods(t *testing.T) {
// DaemonSet with node selector which does not match any node labels should not launch pods.
func TestBadSelectorDaemonDoesNothing(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, podControl, _, err := newTestController()
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1478,7 +1504,8 @@ func TestNameDaemonSetLaunchesPods(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec.NodeName = "node-0"
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1497,7 +1524,8 @@ func TestBadNameDaemonSetDoesNothing(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec.NodeName = "node-10"
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1517,7 +1545,8 @@ func TestNameAndSelectorDaemonSetLaunchesPods(t *testing.T) {
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
ds.Spec.Template.Spec.NodeName = "node-6"
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1538,7 +1567,8 @@ func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) {
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
ds.Spec.Template.Spec.NodeName = "node-0"
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1554,9 +1584,10 @@ func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) {
// DaemonSet with node selector, matching some nodes, should launch pods on all the nodes.
func TestSelectorDaemonSetLaunchesPods(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ds := newDaemonSet("foo")
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
manager, podControl, _, err := newTestController(ds)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1591,8 +1622,8 @@ func TestNodeAffinityDaemonLaunchesPods(t *testing.T) {
},
},
}
manager, podControl, _, err := newTestController(daemon)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, daemon)
if err != nil {
t.Fatalf("error creating DaemonSetsController: %v", err)
}
@ -1610,7 +1641,8 @@ func TestNumberReadyStatus(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, clientset, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, clientset, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1656,7 +1688,8 @@ func TestObservedGeneration(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Generation = 1
manager, podControl, clientset, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, clientset, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1702,7 +1735,8 @@ func TestDaemonKillFailedPods(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1723,10 +1757,11 @@ func TestDaemonKillFailedPods(t *testing.T) {
func TestDaemonKillFailedPodsBackoff(t *testing.T) {
for _, strategy := range updateStrategies() {
t.Run(string(strategy.Type), func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1794,7 +1829,8 @@ func TestNoScheduleTaintedDoesntEvicitRunningIntolerantPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("intolerant")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1824,7 +1860,8 @@ func TestNoExecuteTaintedDoesEvicitRunningIntolerantPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("intolerant")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1853,7 +1890,8 @@ func TestTaintedNodeDaemonDoesNotLaunchIntolerantPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("intolerant")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1879,7 +1917,8 @@ func TestTaintedNodeDaemonLaunchesToleratePod(t *testing.T) {
ds := newDaemonSet("tolerate")
ds.Spec.UpdateStrategy = *strategy
setDaemonSetToleration(ds, noScheduleTolerations)
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1904,7 +1943,8 @@ func TestNotReadyNodeDaemonLaunchesPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("simple")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1932,7 +1972,8 @@ func TestUnreachableNodeDaemonLaunchesPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("simple")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1961,7 +2002,8 @@ func TestNodeDaemonLaunchesToleratePod(t *testing.T) {
ds := newDaemonSet("tolerate")
ds.Spec.UpdateStrategy = *strategy
setDaemonSetToleration(ds, noScheduleTolerations)
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -1980,7 +2022,8 @@ func TestDaemonSetRespectsTermination(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -2015,7 +2058,8 @@ func TestTaintPressureNodeDaemonLaunchesPod(t *testing.T) {
ds := newDaemonSet("critical")
ds.Spec.UpdateStrategy = *strategy
setDaemonSetCritical(ds)
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -2329,7 +2373,8 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
node.Status.Conditions = append(node.Status.Conditions, c.nodeCondition...)
node.Status.Allocatable = allocatableResources("100M", "1")
node.Spec.Unschedulable = c.nodeUnschedulable
manager, _, _, err := newTestController()
_, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -2431,7 +2476,8 @@ func TestUpdateNode(t *testing.T) {
}
for _, c := range cases {
for _, strategy := range updateStrategies() {
manager, podControl, _, err := newTestController()
logger, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -2462,7 +2508,7 @@ func TestUpdateNode(t *testing.T) {
}
enqueued = false
manager.updateNode(c.oldNode, c.newNode)
manager.updateNode(logger, c.oldNode, c.newNode)
if enqueued != c.shouldEnqueue {
t.Errorf("Test case: '%s', expected: %t, got: %t", c.test, c.shouldEnqueue, enqueued)
}
@ -2612,7 +2658,8 @@ func TestDeleteNoDaemonPod(t *testing.T) {
for _, c := range cases {
for _, strategy := range updateStrategies() {
manager, podControl, _, err := newTestController()
logger, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -2639,7 +2686,7 @@ func TestDeleteNoDaemonPod(t *testing.T) {
}
enqueued = false
manager.deletePod(c.deletedPod)
manager.deletePod(logger, c.deletedPod)
if enqueued != c.shouldEnqueue {
t.Errorf("Test case: '%s', expected: %t, got: %t", c.test, c.shouldEnqueue, enqueued)
}
@ -2651,7 +2698,8 @@ func TestDeleteUnscheduledPodForNotExistingNode(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
_, ctx := ktesting.NewTestContext(t)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -2695,7 +2743,8 @@ func TestGetNodesToDaemonPods(t *testing.T) {
ds.Spec.UpdateStrategy = *strategy
ds2 := newDaemonSet("foo2")
ds2.Spec.UpdateStrategy = *strategy
manager, _, _, err := newTestController(ds, ds2)
_, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx, ds, ds2)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -2762,7 +2811,8 @@ func TestGetNodesToDaemonPods(t *testing.T) {
}
func TestAddNode(t *testing.T) {
manager, _, _, err := newTestController()
logger, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -2773,14 +2823,13 @@ func TestAddNode(t *testing.T) {
if err != nil {
t.Fatal(err)
}
manager.addNode(node1)
manager.addNode(logger, node1)
if got, want := manager.queue.Len(), 0; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
node2 := newNode("node2", simpleNodeLabel)
manager.addNode(node2)
manager.addNode(logger, node2)
if got, want := manager.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -2792,7 +2841,8 @@ func TestAddNode(t *testing.T) {
func TestAddPod(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _, err := newTestController()
logger, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -2808,9 +2858,8 @@ func TestAddPod(t *testing.T) {
if err != nil {
t.Fatal(err)
}
pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1)
manager.addPod(pod1)
manager.addPod(logger, pod1)
if got, want := manager.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -2824,7 +2873,7 @@ func TestAddPod(t *testing.T) {
}
pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2)
manager.addPod(pod2)
manager.addPod(logger, pod2)
if got, want := manager.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -2841,7 +2890,8 @@ func TestAddPod(t *testing.T) {
func TestAddPodOrphan(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _, err := newTestController()
logger, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -2867,7 +2917,7 @@ func TestAddPodOrphan(t *testing.T) {
// Make pod an orphan. Expect matching sets to be queued.
pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil)
manager.addPod(pod)
manager.addPod(logger, pod)
if got, want := manager.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -2879,7 +2929,8 @@ func TestAddPodOrphan(t *testing.T) {
func TestUpdatePod(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _, err := newTestController()
logger, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -2899,7 +2950,7 @@ func TestUpdatePod(t *testing.T) {
pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1)
prev := *pod1
bumpResourceVersion(pod1)
manager.updatePod(&prev, pod1)
manager.updatePod(logger, &prev, pod1)
if got, want := manager.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -2915,7 +2966,7 @@ func TestUpdatePod(t *testing.T) {
pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2)
prev = *pod2
bumpResourceVersion(pod2)
manager.updatePod(&prev, pod2)
manager.updatePod(logger, &prev, pod2)
if got, want := manager.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -2932,7 +2983,8 @@ func TestUpdatePod(t *testing.T) {
func TestUpdatePodOrphanSameLabels(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _, err := newTestController()
logger, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -2952,7 +3004,7 @@ func TestUpdatePodOrphanSameLabels(t *testing.T) {
pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil)
prev := *pod
bumpResourceVersion(pod)
manager.updatePod(&prev, pod)
manager.updatePod(logger, &prev, pod)
if got, want := manager.queue.Len(), 0; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -2961,7 +3013,8 @@ func TestUpdatePodOrphanSameLabels(t *testing.T) {
func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _, err := newTestController()
logger, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -2982,7 +3035,7 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
prev := *pod
prev.Labels = map[string]string{"foo2": "bar2"}
bumpResourceVersion(pod)
manager.updatePod(&prev, pod)
manager.updatePod(logger, &prev, pod)
if got, want := manager.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -2996,7 +3049,8 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, _, _, err := newTestController()
logger, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -3015,7 +3069,7 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
prev := *pod
prev.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(ds2, controllerKind)}
bumpResourceVersion(pod)
manager.updatePod(&prev, pod)
manager.updatePod(logger, &prev, pod)
if got, want := manager.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -3024,7 +3078,8 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
func TestUpdatePodControllerRefRemoved(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _, err := newTestController()
logger, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -3045,7 +3100,7 @@ func TestUpdatePodControllerRefRemoved(t *testing.T) {
prev := *pod
pod.OwnerReferences = nil
bumpResourceVersion(pod)
manager.updatePod(&prev, pod)
manager.updatePod(logger, &prev, pod)
if got, want := manager.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -3054,7 +3109,8 @@ func TestUpdatePodControllerRefRemoved(t *testing.T) {
func TestDeletePod(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _, err := newTestController()
logger, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -3072,7 +3128,7 @@ func TestDeletePod(t *testing.T) {
}
pod1 := newPod("pod1-", "node-0", simpleDaemonSetLabel, ds1)
manager.deletePod(pod1)
manager.deletePod(logger, pod1)
if got, want := manager.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -3086,7 +3142,7 @@ func TestDeletePod(t *testing.T) {
}
pod2 := newPod("pod2-", "node-0", simpleDaemonSetLabel, ds2)
manager.deletePod(pod2)
manager.deletePod(logger, pod2)
if got, want := manager.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -3103,7 +3159,8 @@ func TestDeletePod(t *testing.T) {
func TestDeletePodOrphan(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _, err := newTestController()
logger, ctx := ktesting.NewTestContext(t)
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -3128,7 +3185,7 @@ func TestDeletePodOrphan(t *testing.T) {
}
pod := newPod("pod1-", "node-0", simpleDaemonSetLabel, nil)
manager.deletePod(pod)
manager.deletePod(logger, pod)
if got, want := manager.queue.Len(), 0; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -3158,9 +3215,10 @@ func getQueuedKeys(queue workqueue.RateLimitingInterface) []string {
// Controller should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods.
func TestSurgeDealsWithExistingPods(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
manager, podControl, _, err := newTestController(ds)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -3174,9 +3232,10 @@ func TestSurgeDealsWithExistingPods(t *testing.T) {
}
func TestSurgePreservesReadyOldPods(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
manager, podControl, _, err := newTestController(ds)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -3213,9 +3272,10 @@ func TestSurgePreservesReadyOldPods(t *testing.T) {
}
func TestSurgeCreatesNewPodWhenAtMaxSurgeAndOldPodDeleted(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
manager, podControl, _, err := newTestController(ds)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -3259,9 +3319,10 @@ func TestSurgeCreatesNewPodWhenAtMaxSurgeAndOldPodDeleted(t *testing.T) {
}
func TestSurgeDeletesUnreadyOldPods(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
manager, podControl, _, err := newTestController(ds)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -3298,10 +3359,11 @@ func TestSurgeDeletesUnreadyOldPods(t *testing.T) {
}
func TestSurgePreservesOldReadyWithUnsatisfiedMinReady(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ds := newDaemonSet("foo")
ds.Spec.MinReadySeconds = 15
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
manager, podControl, _, err := newTestController(ds)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -3342,10 +3404,11 @@ func TestSurgePreservesOldReadyWithUnsatisfiedMinReady(t *testing.T) {
}
func TestSurgeDeletesOldReadyWithUnsatisfiedMinReady(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ds := newDaemonSet("foo")
ds.Spec.MinReadySeconds = 15
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
manager, podControl, _, err := newTestController(ds)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}

View File

@ -17,9 +17,5 @@ limitations under the License.
package daemon
import (
"k8s.io/klog/v2"
_ "k8s.io/klog/v2/ktesting/init"
)
func init() {
klog.InitFlags(nil)
}

View File

@ -42,11 +42,12 @@ import (
// rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes,
// remaining within the constraints imposed by the update strategy.
func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
logger := klog.FromContext(ctx)
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
maxSurge, maxUnavailable, err := dsc.updatedDesiredNodeCounts(ds, nodeList, nodeToDaemonPods)
maxSurge, maxUnavailable, err := dsc.updatedDesiredNodeCounts(ctx, ds, nodeList, nodeToDaemonPods)
if err != nil {
return fmt.Errorf("couldn't get unavailable numbers: %v", err)
}
@ -73,7 +74,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
if !ok {
// let the manage loop clean up this node, and treat it as an unavailable node
klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName)
logger.V(3).Info("DaemonSet has excess pods on node, skipping to allow the core loop to process", "daemonset", klog.KObj(ds), "node", klog.KRef("", nodeName))
numUnavailable++
continue
}
@ -92,7 +93,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
switch {
case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
// the old pod isn't available, so it needs to be replaced
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName)
logger.V(5).Info("DaemonSet pod on node is out of date and not available, allowing replacement", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
// record the replacement
if allowedReplacementPods == nil {
allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods))
@ -102,7 +103,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
// no point considering any other candidates
continue
default:
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date, this is a candidate to replace", ds.Namespace, ds.Name, oldPod.Name, nodeName)
logger.V(5).Info("DaemonSet pod on node is out of date, this is a candidate to replace", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
// record the candidate
if candidatePodsToDelete == nil {
candidatePodsToDelete = make([]string, 0, maxUnavailable)
@ -113,7 +114,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
}
// use any of the candidates we can, including the allowedReplacemnntPods
klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, up to %d unavailable, %d new are unavailable, %d candidates", ds.Namespace, ds.Name, len(allowedReplacementPods), maxUnavailable, numUnavailable, len(candidatePodsToDelete))
logger.V(5).Info("DaemonSet allowing replacements", "daemonset", klog.KObj(ds), "replacements", len(allowedReplacementPods), "maxUnavailable", maxUnavailable, "numUnavailable", numUnavailable, "candidates", len(candidatePodsToDelete))
remainingUnavailable := maxUnavailable - numUnavailable
if remainingUnavailable < 0 {
remainingUnavailable = 0
@ -148,7 +149,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
if !ok {
// let the manage loop clean up this node, and treat it as a surge node
klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName)
logger.V(3).Info("DaemonSet has excess pods on node, skipping to allow the core loop to process", "daemonset", klog.KObj(ds), "node", klog.KRef("", nodeName))
numSurge++
continue
}
@ -160,7 +161,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
switch {
case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
// the old pod isn't available, allow it to become a replacement
klog.V(5).Infof("Pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName)
logger.V(5).Info("Pod on node is out of date and not available, allowing replacement", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
// record the replacement
if allowedNewNodes == nil {
allowedNewNodes = make([]string, 0, len(nodeToDaemonPods))
@ -170,7 +171,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
// no point considering any other candidates
continue
default:
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date, this is a surge candidate", ds.Namespace, ds.Name, oldPod.Name, nodeName)
logger.V(5).Info("DaemonSet pod on node is out of date, this is a surge candidate", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
// record the candidate
if candidateNewNodes == nil {
candidateNewNodes = make([]string, 0, maxSurge)
@ -185,13 +186,13 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
continue
}
// we're available, delete the old pod
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is available, remove %s", ds.Namespace, ds.Name, newPod.Name, nodeName, oldPod.Name)
logger.V(5).Info("DaemonSet pod on node is available, remove old pod", "daemonset", klog.KObj(ds), "newPod", klog.KObj(newPod), "node", nodeName, "oldPod", klog.KObj(oldPod))
oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
}
}
// use any of the candidates we can, including the allowedNewNodes
klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, surge up to %d, %d are in progress, %d candidates", ds.Namespace, ds.Name, len(allowedNewNodes), maxSurge, numSurge, len(candidateNewNodes))
logger.V(5).Info("DaemonSet allowing replacements", "daemonset", klog.KObj(ds), "replacements", len(allowedNewNodes), "maxSurge", maxSurge, "numSurge", numSurge, "candidates", len(candidateNewNodes))
remainingSurge := maxSurge - numSurge
if remainingSurge < 0 {
remainingSurge = 0
@ -484,6 +485,7 @@ func (dsc *DaemonSetsController) snapshot(ctx context.Context, ds *apps.DaemonSe
history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Create(ctx, history, metav1.CreateOptions{})
if outerErr := err; errors.IsAlreadyExists(outerErr) {
logger := klog.FromContext(ctx)
// TODO: Is it okay to get from historyLister?
existedHistory, getErr := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Get(ctx, name, metav1.GetOptions{})
if getErr != nil {
@ -516,7 +518,7 @@ func (dsc *DaemonSetsController) snapshot(ctx context.Context, ds *apps.DaemonSe
if updateErr != nil {
return nil, updateErr
}
klog.V(2).Infof("Found a hash collision for DaemonSet %q - bumping collisionCount to %d to resolve it", ds.Name, *currDS.Status.CollisionCount)
logger.V(2).Info("Found a hash collision for DaemonSet - bumping collisionCount to resolve it", "daemonset", klog.KObj(ds), "collisionCount", *currDS.Status.CollisionCount)
return nil, outerErr
}
return history, err
@ -524,8 +526,9 @@ func (dsc *DaemonSetsController) snapshot(ctx context.Context, ds *apps.DaemonSe
// updatedDesiredNodeCounts calculates the true number of allowed unavailable or surge pods and
// updates the nodeToDaemonPods array to include an empty array for every node that is not scheduled.
func (dsc *DaemonSetsController) updatedDesiredNodeCounts(ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, error) {
func (dsc *DaemonSetsController) updatedDesiredNodeCounts(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, error) {
var desiredNumberScheduled int
logger := klog.FromContext(ctx)
for i := range nodeList {
node := nodeList[i]
wantToRun, _ := NodeShouldRunDaemonPod(node, ds)
@ -552,10 +555,10 @@ func (dsc *DaemonSetsController) updatedDesiredNodeCounts(ds *apps.DaemonSet, no
// if the daemonset returned with an impossible configuration, obey the default of unavailable=1 (in the
// event the apiserver returns 0 for both surge and unavailability)
if desiredNumberScheduled > 0 && maxUnavailable == 0 && maxSurge == 0 {
klog.Warningf("DaemonSet %s/%s is not configured for surge or unavailability, defaulting to accepting unavailability", ds.Namespace, ds.Name)
logger.Info("DaemonSet is not configured for surge or unavailability, defaulting to accepting unavailability", "daemonset", klog.KObj(ds))
maxUnavailable = 1
}
klog.V(5).Infof("DaemonSet %s/%s, maxSurge: %d, maxUnavailable: %d", ds.Namespace, ds.Name, maxSurge, maxUnavailable)
logger.V(5).Info("DaemonSet with maxSurge and maxUnavailable", "daemonset", klog.KObj(ds), "maxSurge", maxSurge, "maxUnavailable", maxUnavailable)
return maxSurge, maxUnavailable, nil
}

View File

@ -27,15 +27,16 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/daemon/util"
testingclock "k8s.io/utils/clock/testing"
)
func TestDaemonSetUpdatesPods(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ds := newDaemonSet("foo")
manager, podControl, _, err := newTestController(ds)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -75,8 +76,9 @@ func TestDaemonSetUpdatesPods(t *testing.T) {
}
func TestDaemonSetUpdatesPodsWithMaxSurge(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ds := newDaemonSet("foo")
manager, podControl, _, err := newTestController(ds)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -116,8 +118,9 @@ func TestDaemonSetUpdatesPodsWithMaxSurge(t *testing.T) {
}
func TestDaemonSetUpdatesWhenNewPosIsNotReady(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ds := newDaemonSet("foo")
manager, podControl, _, err := newTestController(ds)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -152,8 +155,9 @@ func TestDaemonSetUpdatesWhenNewPosIsNotReady(t *testing.T) {
}
func TestDaemonSetUpdatesAllOldPodsNotReady(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ds := newDaemonSet("foo")
manager, podControl, _, err := newTestController(ds)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -187,8 +191,9 @@ func TestDaemonSetUpdatesAllOldPodsNotReady(t *testing.T) {
}
func TestDaemonSetUpdatesAllOldPodsNotReadyMaxSurge(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ds := newDaemonSet("foo")
manager, podControl, _, err := newTestController(ds)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -284,6 +289,7 @@ func podsByNodeMatchingHash(dsc *daemonSetsController, hash string) map[string][
func setPodReadiness(t *testing.T, dsc *daemonSetsController, ready bool, count int, fn func(*v1.Pod) bool) {
t.Helper()
logger, _ := ktesting.NewTestContext(t)
for _, obj := range dsc.podStore.List() {
if count <= 0 {
break
@ -310,7 +316,7 @@ func setPodReadiness(t *testing.T, dsc *daemonSetsController, ready bool, count
// TODO: workaround UpdatePodCondition calling time.Now() directly
setCondition := podutil.GetPodReadyCondition(pod.Status)
setCondition.LastTransitionTime.Time = dsc.failedPodsBackoff.Clock.Now()
klog.Infof("marked pod %s ready=%t", pod.Name, ready)
logger.Info("marked pod ready", "pod", pod.Name, "ready", ready)
count--
}
if count > 0 {
@ -329,8 +335,9 @@ func currentDSHash(dsc *daemonSetsController, ds *apps.DaemonSet) (string, error
}
func TestDaemonSetUpdatesNoTemplateChanged(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ds := newDaemonSet("foo")
manager, podControl, _, err := newTestController(ds)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
@ -373,7 +380,7 @@ func newUpdateUnavailable(value intstr.IntOrString) apps.DaemonSetUpdateStrategy
func TestGetUnavailableNumbers(t *testing.T) {
cases := []struct {
name string
Manager *daemonSetsController
ManagerFunc func(ctx context.Context) *daemonSetsController
ds *apps.DaemonSet
nodeToPods map[string][]*v1.Pod
maxSurge int
@ -383,13 +390,13 @@ func TestGetUnavailableNumbers(t *testing.T) {
}{
{
name: "No nodes",
Manager: func() *daemonSetsController {
manager, _, _, err := newTestController()
ManagerFunc: func(ctx context.Context) *daemonSetsController {
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
return manager
}(),
},
ds: func() *apps.DaemonSet {
ds := newDaemonSet("x")
ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromInt(0))
@ -401,14 +408,14 @@ func TestGetUnavailableNumbers(t *testing.T) {
},
{
name: "Two nodes with ready pods",
Manager: func() *daemonSetsController {
manager, _, _, err := newTestController()
ManagerFunc: func(ctx context.Context) *daemonSetsController {
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 2, nil)
return manager
}(),
},
ds: func() *apps.DaemonSet {
ds := newDaemonSet("x")
ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromInt(1))
@ -429,14 +436,14 @@ func TestGetUnavailableNumbers(t *testing.T) {
},
{
name: "Two nodes, one node without pods",
Manager: func() *daemonSetsController {
manager, _, _, err := newTestController()
ManagerFunc: func(ctx context.Context) *daemonSetsController {
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 2, nil)
return manager
}(),
},
ds: func() *apps.DaemonSet {
ds := newDaemonSet("x")
ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromInt(0))
@ -454,14 +461,14 @@ func TestGetUnavailableNumbers(t *testing.T) {
},
{
name: "Two nodes, one node without pods, surge",
Manager: func() *daemonSetsController {
manager, _, _, err := newTestController()
ManagerFunc: func(ctx context.Context) *daemonSetsController {
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 2, nil)
return manager
}(),
},
ds: func() *apps.DaemonSet {
ds := newDaemonSet("x")
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(0))
@ -479,14 +486,14 @@ func TestGetUnavailableNumbers(t *testing.T) {
},
{
name: "Two nodes with pods, MaxUnavailable in percents",
Manager: func() *daemonSetsController {
manager, _, _, err := newTestController()
ManagerFunc: func(ctx context.Context) *daemonSetsController {
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 2, nil)
return manager
}(),
},
ds: func() *apps.DaemonSet {
ds := newDaemonSet("x")
ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromString("50%"))
@ -507,14 +514,14 @@ func TestGetUnavailableNumbers(t *testing.T) {
},
{
name: "Two nodes with pods, MaxUnavailable in percents, surge",
Manager: func() *daemonSetsController {
manager, _, _, err := newTestController()
ManagerFunc: func(ctx context.Context) *daemonSetsController {
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 2, nil)
return manager
}(),
},
ds: func() *apps.DaemonSet {
ds := newDaemonSet("x")
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromString("50%"))
@ -536,14 +543,14 @@ func TestGetUnavailableNumbers(t *testing.T) {
},
{
name: "Two nodes with pods, MaxUnavailable is 100%, surge",
Manager: func() *daemonSetsController {
manager, _, _, err := newTestController()
ManagerFunc: func(ctx context.Context) *daemonSetsController {
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 2, nil)
return manager
}(),
},
ds: func() *apps.DaemonSet {
ds := newDaemonSet("x")
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromString("100%"))
@ -565,14 +572,14 @@ func TestGetUnavailableNumbers(t *testing.T) {
},
{
name: "Two nodes with pods, MaxUnavailable in percents, pod terminating",
Manager: func() *daemonSetsController {
manager, _, _, err := newTestController()
ManagerFunc: func(ctx context.Context) *daemonSetsController {
manager, _, _, err := newTestController(ctx)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 3, nil)
return manager
}(),
},
ds: func() *apps.DaemonSet {
ds := newDaemonSet("x")
ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromString("50%"))
@ -597,12 +604,14 @@ func TestGetUnavailableNumbers(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
c.Manager.dsStore.Add(c.ds)
nodeList, err := c.Manager.nodeLister.List(labels.Everything())
_, ctx := ktesting.NewTestContext(t)
manager := c.ManagerFunc(ctx)
manager.dsStore.Add(c.ds)
nodeList, err := manager.nodeLister.List(labels.Everything())
if err != nil {
t.Fatalf("error listing nodes: %v", err)
}
maxSurge, maxUnavailable, err := c.Manager.updatedDesiredNodeCounts(c.ds, nodeList, c.nodeToPods)
maxSurge, maxUnavailable, err := manager.updatedDesiredNodeCounts(ctx, c.ds, nodeList, c.nodeToPods)
if err != nil && c.Err != nil {
if c.Err != err {
t.Fatalf("Expected error: %v but got: %v", c.Err, err)
@ -635,42 +644,44 @@ func TestControlledHistories(t *testing.T) {
orphanCrNotInSameNsWithDs1 := newControllerRevision(ds1.GetName()+"-x3", ds1.GetNamespace()+"-other", ds1.Spec.Template.Labels, nil)
cases := []struct {
name string
manager *daemonSetsController
managerFunc func(ctx context.Context) *daemonSetsController
historyCRAll []*apps.ControllerRevision
expectControllerRevisions []*apps.ControllerRevision
}{
{
name: "controller revision in the same namespace",
manager: func() *daemonSetsController {
manager, _, _, err := newTestController(ds1, crOfDs1, orphanCrInSameNsWithDs1)
managerFunc: func(ctx context.Context) *daemonSetsController {
manager, _, _, err := newTestController(ctx, ds1, crOfDs1, orphanCrInSameNsWithDs1)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.dsStore.Add(ds1)
return manager
}(),
},
historyCRAll: []*apps.ControllerRevision{crOfDs1, orphanCrInSameNsWithDs1},
expectControllerRevisions: []*apps.ControllerRevision{crOfDs1, orphanCrInSameNsWithDs1},
},
{
name: "Skip adopting the controller revision in namespace other than the one in which DS lives",
manager: func() *daemonSetsController {
manager, _, _, err := newTestController(ds1, orphanCrNotInSameNsWithDs1)
managerFunc: func(ctx context.Context) *daemonSetsController {
manager, _, _, err := newTestController(ctx, ds1, orphanCrNotInSameNsWithDs1)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.dsStore.Add(ds1)
return manager
}(),
},
historyCRAll: []*apps.ControllerRevision{orphanCrNotInSameNsWithDs1},
expectControllerRevisions: []*apps.ControllerRevision{},
},
}
for _, c := range cases {
_, ctx := ktesting.NewTestContext(t)
manager := c.managerFunc(ctx)
for _, h := range c.historyCRAll {
c.manager.historyStore.Add(h)
manager.historyStore.Add(h)
}
crList, err := c.manager.controlledHistories(context.TODO(), ds1)
crList, err := manager.controlledHistories(context.TODO(), ds1)
if err != nil {
t.Fatalf("Test case: %s. Unexpected error: %v", c.name, err)
}

View File

@ -39,6 +39,7 @@ import (
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@ -75,8 +76,11 @@ func setupWithServerSetup(t *testing.T, serverSetup framework.TestServerSetup) (
clientSet, config, closeFn := framework.StartTestServer(t, serverSetup)
resyncPeriod := 12 * time.Hour
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "daemonset-informers")), resyncPeriod)
dc, err := daemon.NewDaemonSetsController(
ctx,
informers.Apps().V1().DaemonSets(),
informers.Apps().V1().ControllerRevisions(),
informers.Core().V1().Pods(),
@ -88,8 +92,6 @@ func setupWithServerSetup(t *testing.T, serverSetup framework.TestServerSetup) (
t.Fatalf("error creating DaemonSets controller: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: clientSet.EventsV1(),
})