mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-25 01:20:18 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1527 lines
		
	
	
		
			55 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1527 lines
		
	
	
		
			55 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2015 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package daemon
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"reflect"
 | |
| 	"sort"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"k8s.io/klog"
 | |
| 
 | |
| 	apps "k8s.io/api/apps/v1"
 | |
| 	"k8s.io/api/core/v1"
 | |
| 	apiequality "k8s.io/apimachinery/pkg/api/equality"
 | |
| 	"k8s.io/apimachinery/pkg/api/errors"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/labels"
 | |
| 	utilerrors "k8s.io/apimachinery/pkg/util/errors"
 | |
| 	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/util/sets"
 | |
| 	"k8s.io/apimachinery/pkg/util/wait"
 | |
| 	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | |
| 	appsinformers "k8s.io/client-go/informers/apps/v1"
 | |
| 	coreinformers "k8s.io/client-go/informers/core/v1"
 | |
| 	clientset "k8s.io/client-go/kubernetes"
 | |
| 	"k8s.io/client-go/kubernetes/scheme"
 | |
| 	unversionedapps "k8s.io/client-go/kubernetes/typed/apps/v1"
 | |
| 	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
 | |
| 	appslisters "k8s.io/client-go/listers/apps/v1"
 | |
| 	corelisters "k8s.io/client-go/listers/core/v1"
 | |
| 	"k8s.io/client-go/tools/cache"
 | |
| 	"k8s.io/client-go/tools/record"
 | |
| 	"k8s.io/client-go/util/flowcontrol"
 | |
| 	"k8s.io/client-go/util/integer"
 | |
| 	"k8s.io/client-go/util/workqueue"
 | |
| 	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
 | |
| 	"k8s.io/kubernetes/pkg/controller"
 | |
| 	"k8s.io/kubernetes/pkg/controller/daemon/util"
 | |
| 	"k8s.io/kubernetes/pkg/features"
 | |
| 	kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
 | |
| 	"k8s.io/kubernetes/pkg/scheduler/algorithm"
 | |
| 	"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
 | |
| 	schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
 | |
| 	"k8s.io/kubernetes/pkg/util/metrics"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// BurstReplicas is a rate limiter for booting pods on a lot of pods.
 | |
| 	// The value of 250 is chosen b/c values that are too high can cause registry DoS issues.
 | |
| 	BurstReplicas = 250
 | |
| 
 | |
| 	// StatusUpdateRetries limits the number of retries if sending a status update to API server fails.
 | |
| 	StatusUpdateRetries = 1
 | |
| 
 | |
| 	// BackoffGCInterval is the time that has to pass before next iteration of backoff GC is run
 | |
| 	BackoffGCInterval = 1 * time.Minute
 | |
| )
 | |
| 
 | |
| // Reasons for DaemonSet events
 | |
| const (
 | |
| 	// SelectingAllReason is added to an event when a DaemonSet selects all Pods.
 | |
| 	SelectingAllReason = "SelectingAll"
 | |
| 	// FailedPlacementReason is added to an event when a DaemonSet can't schedule a Pod to a specified node.
 | |
| 	FailedPlacementReason = "FailedPlacement"
 | |
| 	// FailedDaemonPodReason is added to an event when the status of a Pod of a DaemonSet is 'Failed'.
 | |
| 	FailedDaemonPodReason = "FailedDaemonPod"
 | |
| )
 | |
| 
 | |
| // controllerKind contains the schema.GroupVersionKind for this controller type.
 | |
| var controllerKind = apps.SchemeGroupVersion.WithKind("DaemonSet")
 | |
| 
 | |
| // DaemonSetsController is responsible for synchronizing DaemonSet objects stored
 | |
| // in the system with actual running pods.
 | |
| type DaemonSetsController struct {
 | |
| 	kubeClient    clientset.Interface
 | |
| 	eventRecorder record.EventRecorder
 | |
| 	podControl    controller.PodControlInterface
 | |
| 	crControl     controller.ControllerRevisionControlInterface
 | |
| 
 | |
| 	// An dsc is temporarily suspended after creating/deleting these many replicas.
 | |
| 	// It resumes normal action after observing the watch events for them.
 | |
| 	burstReplicas int
 | |
| 
 | |
| 	// To allow injection of syncDaemonSet for testing.
 | |
| 	syncHandler func(dsKey string) error
 | |
| 	// used for unit testing
 | |
| 	enqueueDaemonSet            func(ds *apps.DaemonSet)
 | |
| 	enqueueDaemonSetRateLimited func(ds *apps.DaemonSet)
 | |
| 	// A TTLCache of pod creates/deletes each ds expects to see
 | |
| 	expectations controller.ControllerExpectationsInterface
 | |
| 	// dsLister can list/get daemonsets from the shared informer's store
 | |
| 	dsLister appslisters.DaemonSetLister
 | |
| 	// dsStoreSynced returns true if the daemonset store has been synced at least once.
 | |
| 	// Added as a member to the struct to allow injection for testing.
 | |
| 	dsStoreSynced cache.InformerSynced
 | |
| 	// historyLister get list/get history from the shared informers's store
 | |
| 	historyLister appslisters.ControllerRevisionLister
 | |
| 	// historyStoreSynced returns true if the history store has been synced at least once.
 | |
| 	// Added as a member to the struct to allow injection for testing.
 | |
| 	historyStoreSynced cache.InformerSynced
 | |
| 	// podLister get list/get pods from the shared informers's store
 | |
| 	podLister corelisters.PodLister
 | |
| 	// podNodeIndex indexes pods by their nodeName
 | |
| 	podNodeIndex cache.Indexer
 | |
| 	// podStoreSynced returns true if the pod store has been synced at least once.
 | |
| 	// Added as a member to the struct to allow injection for testing.
 | |
| 	podStoreSynced cache.InformerSynced
 | |
| 	// nodeLister can list/get nodes from the shared informer's store
 | |
| 	nodeLister corelisters.NodeLister
 | |
| 	// nodeStoreSynced returns true if the node store has been synced at least once.
 | |
| 	// Added as a member to the struct to allow injection for testing.
 | |
| 	nodeStoreSynced cache.InformerSynced
 | |
| 
 | |
| 	// DaemonSet keys that need to be synced.
 | |
| 	queue workqueue.RateLimitingInterface
 | |
| 
 | |
| 	// The DaemonSet that has suspended pods on nodes; the key is node name, the value
 | |
| 	// is DaemonSet set that want to run pods but can't schedule in latest syncup cycle.
 | |
| 	suspendedDaemonPodsMutex sync.Mutex
 | |
| 	suspendedDaemonPods      map[string]sets.String
 | |
| 
 | |
| 	failedPodsBackoff *flowcontrol.Backoff
 | |
| }
 | |
| 
 | |
| // NewDaemonSetsController creates a new DaemonSetsController
 | |
| func NewDaemonSetsController(
 | |
| 	daemonSetInformer appsinformers.DaemonSetInformer,
 | |
| 	historyInformer appsinformers.ControllerRevisionInformer,
 | |
| 	podInformer coreinformers.PodInformer,
 | |
| 	nodeInformer coreinformers.NodeInformer,
 | |
| 	kubeClient clientset.Interface,
 | |
| 	failedPodsBackoff *flowcontrol.Backoff,
 | |
| ) (*DaemonSetsController, error) {
 | |
| 	eventBroadcaster := record.NewBroadcaster()
 | |
| 	eventBroadcaster.StartLogging(klog.Infof)
 | |
| 	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
 | |
| 
 | |
| 	if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
 | |
| 		if err := metrics.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 	dsc := &DaemonSetsController{
 | |
| 		kubeClient:    kubeClient,
 | |
| 		eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
 | |
| 		podControl: controller.RealPodControl{
 | |
| 			KubeClient: kubeClient,
 | |
| 			Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
 | |
| 		},
 | |
| 		crControl: controller.RealControllerRevisionControl{
 | |
| 			KubeClient: kubeClient,
 | |
| 		},
 | |
| 		burstReplicas:       BurstReplicas,
 | |
| 		expectations:        controller.NewControllerExpectations(),
 | |
| 		queue:               workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
 | |
| 		suspendedDaemonPods: map[string]sets.String{},
 | |
| 	}
 | |
| 
 | |
| 	daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | |
| 		AddFunc: func(obj interface{}) {
 | |
| 			ds := obj.(*apps.DaemonSet)
 | |
| 			klog.V(4).Infof("Adding daemon set %s", ds.Name)
 | |
| 			dsc.enqueueDaemonSet(ds)
 | |
| 		},
 | |
| 		UpdateFunc: func(old, cur interface{}) {
 | |
| 			oldDS := old.(*apps.DaemonSet)
 | |
| 			curDS := cur.(*apps.DaemonSet)
 | |
| 			klog.V(4).Infof("Updating daemon set %s", oldDS.Name)
 | |
| 			dsc.enqueueDaemonSet(curDS)
 | |
| 		},
 | |
| 		DeleteFunc: dsc.deleteDaemonset,
 | |
| 	})
 | |
| 	dsc.dsLister = daemonSetInformer.Lister()
 | |
| 	dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced
 | |
| 
 | |
| 	historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | |
| 		AddFunc:    dsc.addHistory,
 | |
| 		UpdateFunc: dsc.updateHistory,
 | |
| 		DeleteFunc: dsc.deleteHistory,
 | |
| 	})
 | |
| 	dsc.historyLister = historyInformer.Lister()
 | |
| 	dsc.historyStoreSynced = historyInformer.Informer().HasSynced
 | |
| 
 | |
| 	// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
 | |
| 	// more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
 | |
| 	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | |
| 		AddFunc:    dsc.addPod,
 | |
| 		UpdateFunc: dsc.updatePod,
 | |
| 		DeleteFunc: dsc.deletePod,
 | |
| 	})
 | |
| 	dsc.podLister = podInformer.Lister()
 | |
| 
 | |
| 	// This custom indexer will index pods based on their NodeName which will decrease the amount of pods we need to get in simulate() call.
 | |
| 	podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
 | |
| 		"nodeName": indexByPodNodeName,
 | |
| 	})
 | |
| 	dsc.podNodeIndex = podInformer.Informer().GetIndexer()
 | |
| 	dsc.podStoreSynced = podInformer.Informer().HasSynced
 | |
| 
 | |
| 	nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | |
| 		AddFunc:    dsc.addNode,
 | |
| 		UpdateFunc: dsc.updateNode,
 | |
| 	},
 | |
| 	)
 | |
| 	dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
 | |
| 	dsc.nodeLister = nodeInformer.Lister()
 | |
| 
 | |
| 	dsc.syncHandler = dsc.syncDaemonSet
 | |
| 	dsc.enqueueDaemonSet = dsc.enqueue
 | |
| 	dsc.enqueueDaemonSetRateLimited = dsc.enqueueRateLimited
 | |
| 
 | |
| 	dsc.failedPodsBackoff = failedPodsBackoff
 | |
| 
 | |
| 	return dsc, nil
 | |
| }
 | |
| 
 | |
| func indexByPodNodeName(obj interface{}) ([]string, error) {
 | |
| 	pod, ok := obj.(*v1.Pod)
 | |
| 	if !ok {
 | |
| 		return []string{}, nil
 | |
| 	}
 | |
| 	// We are only interested in active pods with nodeName set
 | |
| 	if len(pod.Spec.NodeName) == 0 || pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
 | |
| 		return []string{}, nil
 | |
| 	}
 | |
| 	return []string{pod.Spec.NodeName}, nil
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
 | |
| 	ds, ok := obj.(*apps.DaemonSet)
 | |
| 	if !ok {
 | |
| 		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | |
| 		if !ok {
 | |
| 			utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
 | |
| 			return
 | |
| 		}
 | |
| 		ds, ok = tombstone.Obj.(*apps.DaemonSet)
 | |
| 		if !ok {
 | |
| 			utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a DaemonSet %#v", obj))
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 	klog.V(4).Infof("Deleting daemon set %s", ds.Name)
 | |
| 	dsc.enqueueDaemonSet(ds)
 | |
| }
 | |
| 
 | |
| // Run begins watching and syncing daemon sets.
 | |
| func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
 | |
| 	defer utilruntime.HandleCrash()
 | |
| 	defer dsc.queue.ShutDown()
 | |
| 
 | |
| 	klog.Infof("Starting daemon sets controller")
 | |
| 	defer klog.Infof("Shutting down daemon sets controller")
 | |
| 
 | |
| 	if !controller.WaitForCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	for i := 0; i < workers; i++ {
 | |
| 		go wait.Until(dsc.runWorker, time.Second, stopCh)
 | |
| 	}
 | |
| 
 | |
| 	go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh)
 | |
| 
 | |
| 	<-stopCh
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) runWorker() {
 | |
| 	for dsc.processNextWorkItem() {
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
 | |
| func (dsc *DaemonSetsController) processNextWorkItem() bool {
 | |
| 	dsKey, quit := dsc.queue.Get()
 | |
| 	if quit {
 | |
| 		return false
 | |
| 	}
 | |
| 	defer dsc.queue.Done(dsKey)
 | |
| 
 | |
| 	err := dsc.syncHandler(dsKey.(string))
 | |
| 	if err == nil {
 | |
| 		dsc.queue.Forget(dsKey)
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
 | |
| 	dsc.queue.AddRateLimited(dsKey)
 | |
| 
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) enqueue(ds *apps.DaemonSet) {
 | |
| 	key, err := controller.KeyFunc(ds)
 | |
| 	if err != nil {
 | |
| 		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// TODO: Handle overlapping controllers better. See comment in ReplicationManager.
 | |
| 	dsc.queue.Add(key)
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) enqueueRateLimited(ds *apps.DaemonSet) {
 | |
| 	key, err := controller.KeyFunc(ds)
 | |
| 	if err != nil {
 | |
| 		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	dsc.queue.AddRateLimited(key)
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) enqueueDaemonSetAfter(obj interface{}, after time.Duration) {
 | |
| 	key, err := controller.KeyFunc(obj)
 | |
| 	if err != nil {
 | |
| 		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// TODO: Handle overlapping controllers better. See comment in ReplicationManager.
 | |
| 	dsc.queue.AddAfter(key, after)
 | |
| }
 | |
| 
 | |
| // getDaemonSetsForPod returns a list of DaemonSets that potentially match the pod.
 | |
| func (dsc *DaemonSetsController) getDaemonSetsForPod(pod *v1.Pod) []*apps.DaemonSet {
 | |
| 	sets, err := dsc.dsLister.GetPodDaemonSets(pod)
 | |
| 	if err != nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	if len(sets) > 1 {
 | |
| 		// ControllerRef will ensure we don't do anything crazy, but more than one
 | |
| 		// item in this list nevertheless constitutes user error.
 | |
| 		utilruntime.HandleError(fmt.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels))
 | |
| 	}
 | |
| 	return sets
 | |
| }
 | |
| 
 | |
| // getDaemonSetsForHistory returns a list of DaemonSets that potentially
 | |
| // match a ControllerRevision.
 | |
| func (dsc *DaemonSetsController) getDaemonSetsForHistory(history *apps.ControllerRevision) []*apps.DaemonSet {
 | |
| 	daemonSets, err := dsc.dsLister.GetHistoryDaemonSets(history)
 | |
| 	if err != nil || len(daemonSets) == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 	if len(daemonSets) > 1 {
 | |
| 		// ControllerRef will ensure we don't do anything crazy, but more than one
 | |
| 		// item in this list nevertheless constitutes user error.
 | |
| 		klog.V(4).Infof("User error! more than one DaemonSets is selecting ControllerRevision %s/%s with labels: %#v",
 | |
| 			history.Namespace, history.Name, history.Labels)
 | |
| 	}
 | |
| 	return daemonSets
 | |
| }
 | |
| 
 | |
| // addHistory enqueues the DaemonSet that manages a ControllerRevision when the ControllerRevision is created
 | |
| // or when the controller manager is restarted.
 | |
| func (dsc *DaemonSetsController) addHistory(obj interface{}) {
 | |
| 	history := obj.(*apps.ControllerRevision)
 | |
| 	if history.DeletionTimestamp != nil {
 | |
| 		// On a restart of the controller manager, it's possible for an object to
 | |
| 		// show up in a state that is already pending deletion.
 | |
| 		dsc.deleteHistory(history)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// If it has a ControllerRef, that's all that matters.
 | |
| 	if controllerRef := metav1.GetControllerOf(history); controllerRef != nil {
 | |
| 		ds := dsc.resolveControllerRef(history.Namespace, controllerRef)
 | |
| 		if ds == nil {
 | |
| 			return
 | |
| 		}
 | |
| 		klog.V(4).Infof("ControllerRevision %s added.", history.Name)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync
 | |
| 	// them to see if anyone wants to adopt it.
 | |
| 	daemonSets := dsc.getDaemonSetsForHistory(history)
 | |
| 	if len(daemonSets) == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	klog.V(4).Infof("Orphan ControllerRevision %s added.", history.Name)
 | |
| 	for _, ds := range daemonSets {
 | |
| 		dsc.enqueueDaemonSet(ds)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // updateHistory figures out what DaemonSet(s) manage a ControllerRevision when the ControllerRevision
 | |
| // is updated and wake them up. If anything of the ControllerRevision has changed, we need to  awaken
 | |
| // both the old and new DaemonSets.
 | |
| func (dsc *DaemonSetsController) updateHistory(old, cur interface{}) {
 | |
| 	curHistory := cur.(*apps.ControllerRevision)
 | |
| 	oldHistory := old.(*apps.ControllerRevision)
 | |
| 	if curHistory.ResourceVersion == oldHistory.ResourceVersion {
 | |
| 		// Periodic resync will send update events for all known ControllerRevisions.
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	curControllerRef := metav1.GetControllerOf(curHistory)
 | |
| 	oldControllerRef := metav1.GetControllerOf(oldHistory)
 | |
| 	controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
 | |
| 	if controllerRefChanged && oldControllerRef != nil {
 | |
| 		// The ControllerRef was changed. Sync the old controller, if any.
 | |
| 		if ds := dsc.resolveControllerRef(oldHistory.Namespace, oldControllerRef); ds != nil {
 | |
| 			dsc.enqueueDaemonSet(ds)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// If it has a ControllerRef, that's all that matters.
 | |
| 	if curControllerRef != nil {
 | |
| 		ds := dsc.resolveControllerRef(curHistory.Namespace, curControllerRef)
 | |
| 		if ds == nil {
 | |
| 			return
 | |
| 		}
 | |
| 		klog.V(4).Infof("ControllerRevision %s updated.", curHistory.Name)
 | |
| 		dsc.enqueueDaemonSet(ds)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// Otherwise, it's an orphan. If anything changed, sync matching controllers
 | |
| 	// to see if anyone wants to adopt it now.
 | |
| 	labelChanged := !reflect.DeepEqual(curHistory.Labels, oldHistory.Labels)
 | |
| 	if labelChanged || controllerRefChanged {
 | |
| 		daemonSets := dsc.getDaemonSetsForHistory(curHistory)
 | |
| 		if len(daemonSets) == 0 {
 | |
| 			return
 | |
| 		}
 | |
| 		klog.V(4).Infof("Orphan ControllerRevision %s updated.", curHistory.Name)
 | |
| 		for _, ds := range daemonSets {
 | |
| 			dsc.enqueueDaemonSet(ds)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // deleteHistory enqueues the DaemonSet that manages a ControllerRevision when
 | |
| // the ControllerRevision is deleted. obj could be an *app.ControllerRevision, or
 | |
| // a DeletionFinalStateUnknown marker item.
 | |
| func (dsc *DaemonSetsController) deleteHistory(obj interface{}) {
 | |
| 	history, ok := obj.(*apps.ControllerRevision)
 | |
| 
 | |
| 	// When a delete is dropped, the relist will notice a ControllerRevision in the store not
 | |
| 	// in the list, leading to the insertion of a tombstone object which contains
 | |
| 	// the deleted key/value. Note that this value might be stale. If the ControllerRevision
 | |
| 	// changed labels the new DaemonSet will not be woken up till the periodic resync.
 | |
| 	if !ok {
 | |
| 		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | |
| 		if !ok {
 | |
| 			utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
 | |
| 			return
 | |
| 		}
 | |
| 		history, ok = tombstone.Obj.(*apps.ControllerRevision)
 | |
| 		if !ok {
 | |
| 			utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ControllerRevision %#v", obj))
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	controllerRef := metav1.GetControllerOf(history)
 | |
| 	if controllerRef == nil {
 | |
| 		// No controller should care about orphans being deleted.
 | |
| 		return
 | |
| 	}
 | |
| 	ds := dsc.resolveControllerRef(history.Namespace, controllerRef)
 | |
| 	if ds == nil {
 | |
| 		return
 | |
| 	}
 | |
| 	klog.V(4).Infof("ControllerRevision %s deleted.", history.Name)
 | |
| 	dsc.enqueueDaemonSet(ds)
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) addPod(obj interface{}) {
 | |
| 	pod := obj.(*v1.Pod)
 | |
| 
 | |
| 	if pod.DeletionTimestamp != nil {
 | |
| 		// on a restart of the controller manager, it's possible a new pod shows up in a state that
 | |
| 		// is already pending deletion. Prevent the pod from being a creation observation.
 | |
| 		dsc.deletePod(pod)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// If it has a ControllerRef, that's all that matters.
 | |
| 	if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
 | |
| 		ds := dsc.resolveControllerRef(pod.Namespace, controllerRef)
 | |
| 		if ds == nil {
 | |
| 			return
 | |
| 		}
 | |
| 		dsKey, err := controller.KeyFunc(ds)
 | |
| 		if err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 		klog.V(4).Infof("Pod %s added.", pod.Name)
 | |
| 		dsc.expectations.CreationObserved(dsKey)
 | |
| 		dsc.enqueueDaemonSet(ds)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync
 | |
| 	// them to see if anyone wants to adopt it.
 | |
| 	// DO NOT observe creation because no controller should be waiting for an
 | |
| 	// orphan.
 | |
| 	dss := dsc.getDaemonSetsForPod(pod)
 | |
| 	if len(dss) == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	klog.V(4).Infof("Orphan Pod %s added.", pod.Name)
 | |
| 	for _, ds := range dss {
 | |
| 		dsc.enqueueDaemonSet(ds)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // When a pod is updated, figure out what sets manage it and wake them
 | |
| // up. If the labels of the pod have changed we need to awaken both the old
 | |
| // and new set. old and cur must be *v1.Pod types.
 | |
| func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
 | |
| 	curPod := cur.(*v1.Pod)
 | |
| 	oldPod := old.(*v1.Pod)
 | |
| 	if curPod.ResourceVersion == oldPod.ResourceVersion {
 | |
| 		// Periodic resync will send update events for all known pods.
 | |
| 		// Two different versions of the same pod will always have different RVs.
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	curControllerRef := metav1.GetControllerOf(curPod)
 | |
| 	oldControllerRef := metav1.GetControllerOf(oldPod)
 | |
| 	controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
 | |
| 	if controllerRefChanged && oldControllerRef != nil {
 | |
| 		// The ControllerRef was changed. Sync the old controller, if any.
 | |
| 		if ds := dsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); ds != nil {
 | |
| 			dsc.enqueueDaemonSet(ds)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// If it has a ControllerRef, that's all that matters.
 | |
| 	if curControllerRef != nil {
 | |
| 		ds := dsc.resolveControllerRef(curPod.Namespace, curControllerRef)
 | |
| 		if ds == nil {
 | |
| 			return
 | |
| 		}
 | |
| 		klog.V(4).Infof("Pod %s updated.", curPod.Name)
 | |
| 		dsc.enqueueDaemonSet(ds)
 | |
| 		changedToReady := !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod)
 | |
| 		// See https://github.com/kubernetes/kubernetes/pull/38076 for more details
 | |
| 		if changedToReady && ds.Spec.MinReadySeconds > 0 {
 | |
| 			// Add a second to avoid milliseconds skew in AddAfter.
 | |
| 			// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
 | |
| 			dsc.enqueueDaemonSetAfter(ds, (time.Duration(ds.Spec.MinReadySeconds)*time.Second)+time.Second)
 | |
| 		}
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// Otherwise, it's an orphan. If anything changed, sync matching controllers
 | |
| 	// to see if anyone wants to adopt it now.
 | |
| 	dss := dsc.getDaemonSetsForPod(curPod)
 | |
| 	if len(dss) == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	klog.V(4).Infof("Orphan Pod %s updated.", curPod.Name)
 | |
| 	labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
 | |
| 	if labelChanged || controllerRefChanged {
 | |
| 		for _, ds := range dss {
 | |
| 			dsc.enqueueDaemonSet(ds)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // listSuspendedDaemonPods lists the Daemon pods that 'want to run, but should not schedule'
 | |
| // for the node.
 | |
| func (dsc *DaemonSetsController) listSuspendedDaemonPods(node string) (dss []string) {
 | |
| 	dsc.suspendedDaemonPodsMutex.Lock()
 | |
| 	defer dsc.suspendedDaemonPodsMutex.Unlock()
 | |
| 
 | |
| 	if _, found := dsc.suspendedDaemonPods[node]; !found {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	for k := range dsc.suspendedDaemonPods[node] {
 | |
| 		dss = append(dss, k)
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // requeueSuspendedDaemonPods enqueues all DaemonSets which has pods that 'want to run,
 | |
| // but should not schedule' for the node; so DaemonSetController will sync up them again.
 | |
| func (dsc *DaemonSetsController) requeueSuspendedDaemonPods(node string) {
 | |
| 	dss := dsc.listSuspendedDaemonPods(node)
 | |
| 	for _, dsKey := range dss {
 | |
| 		if ns, name, err := cache.SplitMetaNamespaceKey(dsKey); err != nil {
 | |
| 			klog.Errorf("Failed to get DaemonSet's namespace and name from %s: %v", dsKey, err)
 | |
| 			continue
 | |
| 		} else if ds, err := dsc.dsLister.DaemonSets(ns).Get(name); err != nil {
 | |
| 			klog.Errorf("Failed to get DaemonSet %s/%s: %v", ns, name, err)
 | |
| 			continue
 | |
| 		} else {
 | |
| 			dsc.enqueueDaemonSetRateLimited(ds)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // addSuspendedDaemonPods adds DaemonSet which has pods that 'want to run,
 | |
| // but should not schedule' for the node to the suspended queue.
 | |
| func (dsc *DaemonSetsController) addSuspendedDaemonPods(node, ds string) {
 | |
| 	dsc.suspendedDaemonPodsMutex.Lock()
 | |
| 	defer dsc.suspendedDaemonPodsMutex.Unlock()
 | |
| 
 | |
| 	if _, found := dsc.suspendedDaemonPods[node]; !found {
 | |
| 		dsc.suspendedDaemonPods[node] = sets.NewString()
 | |
| 	}
 | |
| 	dsc.suspendedDaemonPods[node].Insert(ds)
 | |
| }
 | |
| 
 | |
| // removeSuspendedDaemonPods removes DaemonSet which has pods that 'want to run,
 | |
| // but should not schedule' for the node from suspended queue.
 | |
| func (dsc *DaemonSetsController) removeSuspendedDaemonPods(node, ds string) {
 | |
| 	dsc.suspendedDaemonPodsMutex.Lock()
 | |
| 	defer dsc.suspendedDaemonPodsMutex.Unlock()
 | |
| 
 | |
| 	if _, found := dsc.suspendedDaemonPods[node]; !found {
 | |
| 		return
 | |
| 	}
 | |
| 	dsc.suspendedDaemonPods[node].Delete(ds)
 | |
| 
 | |
| 	if len(dsc.suspendedDaemonPods[node]) == 0 {
 | |
| 		delete(dsc.suspendedDaemonPods, node)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) deletePod(obj interface{}) {
 | |
| 	pod, ok := obj.(*v1.Pod)
 | |
| 	// When a delete is dropped, the relist will notice a pod in the store not
 | |
| 	// in the list, leading to the insertion of a tombstone object which contains
 | |
| 	// the deleted key/value. Note that this value might be stale. If the pod
 | |
| 	// changed labels the new daemonset will not be woken up till the periodic
 | |
| 	// resync.
 | |
| 	if !ok {
 | |
| 		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | |
| 		if !ok {
 | |
| 			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
 | |
| 			return
 | |
| 		}
 | |
| 		pod, ok = tombstone.Obj.(*v1.Pod)
 | |
| 		if !ok {
 | |
| 			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	controllerRef := metav1.GetControllerOf(pod)
 | |
| 	if controllerRef == nil {
 | |
| 		// No controller should care about orphans being deleted.
 | |
| 		if len(pod.Spec.NodeName) != 0 {
 | |
| 			// If scheduled pods were deleted, requeue suspended daemon pods.
 | |
| 			dsc.requeueSuspendedDaemonPods(pod.Spec.NodeName)
 | |
| 		}
 | |
| 		return
 | |
| 	}
 | |
| 	ds := dsc.resolveControllerRef(pod.Namespace, controllerRef)
 | |
| 	if ds == nil {
 | |
| 		if len(pod.Spec.NodeName) != 0 {
 | |
| 			// If scheduled pods were deleted, requeue suspended daemon pods.
 | |
| 			dsc.requeueSuspendedDaemonPods(pod.Spec.NodeName)
 | |
| 		}
 | |
| 		return
 | |
| 	}
 | |
| 	dsKey, err := controller.KeyFunc(ds)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	klog.V(4).Infof("Pod %s deleted.", pod.Name)
 | |
| 	dsc.expectations.DeletionObserved(dsKey)
 | |
| 	dsc.enqueueDaemonSet(ds)
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) addNode(obj interface{}) {
 | |
| 	// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
 | |
| 	dsList, err := dsc.dsLister.List(labels.Everything())
 | |
| 	if err != nil {
 | |
| 		klog.V(4).Infof("Error enqueueing daemon sets: %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 	node := obj.(*v1.Node)
 | |
| 	for _, ds := range dsList {
 | |
| 		_, shouldSchedule, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		if shouldSchedule {
 | |
| 			dsc.enqueueDaemonSet(ds)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // nodeInSameCondition returns true if all effective types ("Status" is true) equals;
 | |
| // otherwise, returns false.
 | |
| func nodeInSameCondition(old []v1.NodeCondition, cur []v1.NodeCondition) bool {
 | |
| 	if len(old) == 0 && len(cur) == 0 {
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	c1map := map[v1.NodeConditionType]v1.ConditionStatus{}
 | |
| 	for _, c := range old {
 | |
| 		if c.Status == v1.ConditionTrue {
 | |
| 			c1map[c.Type] = c.Status
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	for _, c := range cur {
 | |
| 		if c.Status != v1.ConditionTrue {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if _, found := c1map[c.Type]; !found {
 | |
| 			return false
 | |
| 		}
 | |
| 
 | |
| 		delete(c1map, c.Type)
 | |
| 	}
 | |
| 
 | |
| 	return len(c1map) == 0
 | |
| }
 | |
| 
 | |
| func shouldIgnoreNodeUpdate(oldNode, curNode v1.Node) bool {
 | |
| 	if !nodeInSameCondition(oldNode.Status.Conditions, curNode.Status.Conditions) {
 | |
| 		return false
 | |
| 	}
 | |
| 	oldNode.ResourceVersion = curNode.ResourceVersion
 | |
| 	oldNode.Status.Conditions = curNode.Status.Conditions
 | |
| 	return apiequality.Semantic.DeepEqual(oldNode, curNode)
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
 | |
| 	oldNode := old.(*v1.Node)
 | |
| 	curNode := cur.(*v1.Node)
 | |
| 	if shouldIgnoreNodeUpdate(*oldNode, *curNode) {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	dsList, err := dsc.dsLister.List(labels.Everything())
 | |
| 	if err != nil {
 | |
| 		klog.V(4).Infof("Error listing daemon sets: %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 	// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
 | |
| 	for _, ds := range dsList {
 | |
| 		_, oldShouldSchedule, oldShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(oldNode, ds)
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		_, currentShouldSchedule, currentShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(curNode, ds)
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		if (oldShouldSchedule != currentShouldSchedule) || (oldShouldContinueRunning != currentShouldContinueRunning) {
 | |
| 			dsc.enqueueDaemonSet(ds)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // getDaemonPods returns daemon pods owned by the given ds.
 | |
| // This also reconciles ControllerRef by adopting/orphaning.
 | |
| // Note that returned Pods are pointers to objects in the cache.
 | |
| // If you want to modify one, you need to deep-copy it first.
 | |
| func (dsc *DaemonSetsController) getDaemonPods(ds *apps.DaemonSet) ([]*v1.Pod, error) {
 | |
| 	selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// List all pods to include those that don't match the selector anymore but
 | |
| 	// have a ControllerRef pointing to this controller.
 | |
| 	pods, err := dsc.podLister.Pods(ds.Namespace).List(labels.Everything())
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	// If any adoptions are attempted, we should first recheck for deletion with
 | |
| 	// an uncached quorum read sometime after listing Pods (see #42639).
 | |
| 	dsNotDeleted := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
 | |
| 		fresh, err := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(ds.Name, metav1.GetOptions{})
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if fresh.UID != ds.UID {
 | |
| 			return nil, fmt.Errorf("original DaemonSet %v/%v is gone: got uid %v, wanted %v", ds.Namespace, ds.Name, fresh.UID, ds.UID)
 | |
| 		}
 | |
| 		return fresh, nil
 | |
| 	})
 | |
| 
 | |
| 	// Use ControllerRefManager to adopt/orphan as needed.
 | |
| 	cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind, dsNotDeleted)
 | |
| 	return cm.ClaimPods(pods)
 | |
| }
 | |
| 
 | |
| // getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to ds) created for the nodes.
 | |
| // This also reconciles ControllerRef by adopting/orphaning.
 | |
| // Note that returned Pods are pointers to objects in the cache.
 | |
| // If you want to modify one, you need to deep-copy it first.
 | |
| func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *apps.DaemonSet) (map[string][]*v1.Pod, error) {
 | |
| 	claimedPods, err := dsc.getDaemonPods(ds)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	// Group Pods by Node name.
 | |
| 	nodeToDaemonPods := make(map[string][]*v1.Pod)
 | |
| 	for _, pod := range claimedPods {
 | |
| 		nodeName, err := util.GetTargetNodeName(pod)
 | |
| 		if err != nil {
 | |
| 			klog.Warningf("Failed to get target node name of Pod %v/%v in DaemonSet %v/%v",
 | |
| 				pod.Namespace, pod.Name, ds.Namespace, ds.Name)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod)
 | |
| 	}
 | |
| 
 | |
| 	return nodeToDaemonPods, nil
 | |
| }
 | |
| 
 | |
| // resolveControllerRef returns the controller referenced by a ControllerRef,
 | |
| // or nil if the ControllerRef could not be resolved to a matching controller
 | |
| // of the correct Kind.
 | |
| func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.DaemonSet {
 | |
| 	// We can't look up by UID, so look up by Name and then verify UID.
 | |
| 	// Don't even try to look up by Name if it's the wrong Kind.
 | |
| 	if controllerRef.Kind != controllerKind.Kind {
 | |
| 		return nil
 | |
| 	}
 | |
| 	ds, err := dsc.dsLister.DaemonSets(namespace).Get(controllerRef.Name)
 | |
| 	if err != nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	if ds.UID != controllerRef.UID {
 | |
| 		// The controller we found with this Name is not the same one that the
 | |
| 		// ControllerRef points to.
 | |
| 		return nil
 | |
| 	}
 | |
| 	return ds
 | |
| }
 | |
| 
 | |
| // podsShouldBeOnNode figures out the DaemonSet pods to be created and deleted on the given node:
 | |
| //   - nodesNeedingDaemonPods: the pods need to start on the node
 | |
| //   - podsToDelete: the Pods need to be deleted on the node
 | |
| //   - failedPodsObserved: the number of failed pods on node
 | |
| //   - err: unexpected error
 | |
| func (dsc *DaemonSetsController) podsShouldBeOnNode(
 | |
| 	node *v1.Node,
 | |
| 	nodeToDaemonPods map[string][]*v1.Pod,
 | |
| 	ds *apps.DaemonSet,
 | |
| ) (nodesNeedingDaemonPods, podsToDelete []string, failedPodsObserved int, err error) {
 | |
| 
 | |
| 	wantToRun, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	daemonPods, exists := nodeToDaemonPods[node.Name]
 | |
| 	dsKey, _ := cache.MetaNamespaceKeyFunc(ds)
 | |
| 
 | |
| 	dsc.removeSuspendedDaemonPods(node.Name, dsKey)
 | |
| 
 | |
| 	switch {
 | |
| 	case wantToRun && !shouldSchedule:
 | |
| 		// If daemon pod is supposed to run, but can not be scheduled, add to suspended list.
 | |
| 		dsc.addSuspendedDaemonPods(node.Name, dsKey)
 | |
| 	case shouldSchedule && !exists:
 | |
| 		// If daemon pod is supposed to be running on node, but isn't, create daemon pod.
 | |
| 		nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
 | |
| 	case shouldContinueRunning:
 | |
| 		// If a daemon pod failed, delete it
 | |
| 		// If there's non-daemon pods left on this node, we will create it in the next sync loop
 | |
| 		var daemonPodsRunning []*v1.Pod
 | |
| 		for _, pod := range daemonPods {
 | |
| 			if pod.DeletionTimestamp != nil {
 | |
| 				continue
 | |
| 			}
 | |
| 			if pod.Status.Phase == v1.PodFailed {
 | |
| 				failedPodsObserved++
 | |
| 
 | |
| 				// This is a critical place where DS is often fighting with kubelet that rejects pods.
 | |
| 				// We need to avoid hot looping and backoff.
 | |
| 				backoffKey := failedPodsBackoffKey(ds, node.Name)
 | |
| 
 | |
| 				now := dsc.failedPodsBackoff.Clock.Now()
 | |
| 				inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
 | |
| 				if inBackoff {
 | |
| 					delay := dsc.failedPodsBackoff.Get(backoffKey)
 | |
| 					klog.V(4).Infof("Deleting failed pod %s/%s on node %s has been limited by backoff - %v remaining",
 | |
| 						pod.Namespace, pod.Name, node.Name, delay)
 | |
| 					dsc.enqueueDaemonSetAfter(ds, delay)
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				dsc.failedPodsBackoff.Next(backoffKey, now)
 | |
| 
 | |
| 				msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name)
 | |
| 				klog.V(2).Infof(msg)
 | |
| 				// Emit an event so that it's discoverable to users.
 | |
| 				dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
 | |
| 				podsToDelete = append(podsToDelete, pod.Name)
 | |
| 			} else {
 | |
| 				daemonPodsRunning = append(daemonPodsRunning, pod)
 | |
| 			}
 | |
| 		}
 | |
| 		// If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
 | |
| 		// Sort the daemon pods by creation time, so the oldest is preserved.
 | |
| 		if len(daemonPodsRunning) > 1 {
 | |
| 			sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
 | |
| 			for i := 1; i < len(daemonPodsRunning); i++ {
 | |
| 				podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
 | |
| 			}
 | |
| 		}
 | |
| 	case !shouldContinueRunning && exists:
 | |
| 		// If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
 | |
| 		for _, pod := range daemonPods {
 | |
| 			podsToDelete = append(podsToDelete, pod.Name)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nodesNeedingDaemonPods, podsToDelete, failedPodsObserved, nil
 | |
| }
 | |
| 
 | |
| // manage manages the scheduling and running of Pods of ds on nodes.
 | |
| // After figuring out which nodes should run a Pod of ds but not yet running one and
 | |
| // which nodes should not run a Pod of ds but currently running one, it calls function
 | |
| // syncNodes with a list of pods to remove and a list of nodes to run a Pod of ds.
 | |
| func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, hash string) error {
 | |
| 	// Find out the pods which are created for the nodes by DaemonSet.
 | |
| 	nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
 | |
| 	}
 | |
| 
 | |
| 	// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
 | |
| 	// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
 | |
| 	nodeList, err := dsc.nodeLister.List(labels.Everything())
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
 | |
| 	}
 | |
| 	var nodesNeedingDaemonPods, podsToDelete []string
 | |
| 	var failedPodsObserved int
 | |
| 	for _, node := range nodeList {
 | |
| 		nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, failedPodsObservedOnNode, err := dsc.podsShouldBeOnNode(
 | |
| 			node, nodeToDaemonPods, ds)
 | |
| 
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
 | |
| 		podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
 | |
| 		failedPodsObserved += failedPodsObservedOnNode
 | |
| 	}
 | |
| 
 | |
| 	// Label new pods using the hash label value of the current history when creating them
 | |
| 	if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Throw an error when the daemon pods fail, to use ratelimiter to prevent kill-recreate hot loop
 | |
| 	if failedPodsObserved > 0 {
 | |
| 		return fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // syncNodes deletes given pods and creates new daemon set pods on the given nodes
 | |
| // returns slice with erros if any
 | |
| func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
 | |
| 	// We need to set expectations before creating/deleting pods to avoid race conditions.
 | |
| 	dsKey, err := controller.KeyFunc(ds)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
 | |
| 	}
 | |
| 
 | |
| 	createDiff := len(nodesNeedingDaemonPods)
 | |
| 	deleteDiff := len(podsToDelete)
 | |
| 
 | |
| 	if createDiff > dsc.burstReplicas {
 | |
| 		createDiff = dsc.burstReplicas
 | |
| 	}
 | |
| 	if deleteDiff > dsc.burstReplicas {
 | |
| 		deleteDiff = dsc.burstReplicas
 | |
| 	}
 | |
| 
 | |
| 	dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)
 | |
| 
 | |
| 	// error channel to communicate back failures.  make the buffer big enough to avoid any blocking
 | |
| 	errCh := make(chan error, createDiff+deleteDiff)
 | |
| 
 | |
| 	klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
 | |
| 	createWait := sync.WaitGroup{}
 | |
| 	// If the returned error is not nil we have a parse error.
 | |
| 	// The controller handles this via the hash.
 | |
| 	generation, err := util.GetTemplateGeneration(ds)
 | |
| 	if err != nil {
 | |
| 		generation = nil
 | |
| 	}
 | |
| 	template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
 | |
| 	// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
 | |
| 	// and double with each successful iteration in a kind of "slow start".
 | |
| 	// This handles attempts to start large numbers of pods that would
 | |
| 	// likely all fail with the same error. For example a project with a
 | |
| 	// low quota that attempts to create a large number of pods will be
 | |
| 	// prevented from spamming the API service with the pod create requests
 | |
| 	// after one of its pods fails.  Conveniently, this also prevents the
 | |
| 	// event spam that those failures would generate.
 | |
| 	batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
 | |
| 	for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
 | |
| 		errorCount := len(errCh)
 | |
| 		createWait.Add(batchSize)
 | |
| 		for i := pos; i < pos+batchSize; i++ {
 | |
| 			go func(ix int) {
 | |
| 				defer createWait.Done()
 | |
| 				var err error
 | |
| 
 | |
| 				podTemplate := &template
 | |
| 
 | |
| 				if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
 | |
| 					podTemplate = template.DeepCopy()
 | |
| 					// The pod's NodeAffinity will be updated to make sure the Pod is bound
 | |
| 					// to the target node by default scheduler. It is safe to do so because there
 | |
| 					// should be no conflicting node affinity with the target node.
 | |
| 					podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
 | |
| 						podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
 | |
| 
 | |
| 					err = dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
 | |
| 						ds, metav1.NewControllerRef(ds, controllerKind))
 | |
| 				} else {
 | |
| 					err = dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, podTemplate,
 | |
| 						ds, metav1.NewControllerRef(ds, controllerKind))
 | |
| 				}
 | |
| 
 | |
| 				if err != nil && errors.IsTimeout(err) {
 | |
| 					// Pod is created but its initialization has timed out.
 | |
| 					// If the initialization is successful eventually, the
 | |
| 					// controller will observe the creation via the informer.
 | |
| 					// If the initialization fails, or if the pod keeps
 | |
| 					// uninitialized for a long time, the informer will not
 | |
| 					// receive any update, and the controller will create a new
 | |
| 					// pod when the expectation expires.
 | |
| 					return
 | |
| 				}
 | |
| 				if err != nil {
 | |
| 					klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
 | |
| 					dsc.expectations.CreationObserved(dsKey)
 | |
| 					errCh <- err
 | |
| 					utilruntime.HandleError(err)
 | |
| 				}
 | |
| 			}(i)
 | |
| 		}
 | |
| 		createWait.Wait()
 | |
| 		// any skipped pods that we never attempted to start shouldn't be expected.
 | |
| 		skippedPods := createDiff - batchSize
 | |
| 		if errorCount < len(errCh) && skippedPods > 0 {
 | |
| 			klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
 | |
| 			for i := 0; i < skippedPods; i++ {
 | |
| 				dsc.expectations.CreationObserved(dsKey)
 | |
| 			}
 | |
| 			// The skipped pods will be retried later. The next controller resync will
 | |
| 			// retry the slow start process.
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
 | |
| 	deleteWait := sync.WaitGroup{}
 | |
| 	deleteWait.Add(deleteDiff)
 | |
| 	for i := 0; i < deleteDiff; i++ {
 | |
| 		go func(ix int) {
 | |
| 			defer deleteWait.Done()
 | |
| 			if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
 | |
| 				klog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
 | |
| 				dsc.expectations.DeletionObserved(dsKey)
 | |
| 				errCh <- err
 | |
| 				utilruntime.HandleError(err)
 | |
| 			}
 | |
| 		}(i)
 | |
| 	}
 | |
| 	deleteWait.Wait()
 | |
| 
 | |
| 	// collect errors if any for proper reporting/retry logic in the controller
 | |
| 	errors := []error{}
 | |
| 	close(errCh)
 | |
| 	for err := range errCh {
 | |
| 		errors = append(errors, err)
 | |
| 	}
 | |
| 	return utilerrors.NewAggregate(errors)
 | |
| }
 | |
| 
 | |
| func storeDaemonSetStatus(dsClient unversionedapps.DaemonSetInterface, ds *apps.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable int, updateObservedGen bool) error {
 | |
| 	if int(ds.Status.DesiredNumberScheduled) == desiredNumberScheduled &&
 | |
| 		int(ds.Status.CurrentNumberScheduled) == currentNumberScheduled &&
 | |
| 		int(ds.Status.NumberMisscheduled) == numberMisscheduled &&
 | |
| 		int(ds.Status.NumberReady) == numberReady &&
 | |
| 		int(ds.Status.UpdatedNumberScheduled) == updatedNumberScheduled &&
 | |
| 		int(ds.Status.NumberAvailable) == numberAvailable &&
 | |
| 		int(ds.Status.NumberUnavailable) == numberUnavailable &&
 | |
| 		ds.Status.ObservedGeneration >= ds.Generation {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	toUpdate := ds.DeepCopy()
 | |
| 
 | |
| 	var updateErr, getErr error
 | |
| 	for i := 0; i < StatusUpdateRetries; i++ {
 | |
| 		if updateObservedGen {
 | |
| 			toUpdate.Status.ObservedGeneration = ds.Generation
 | |
| 		}
 | |
| 		toUpdate.Status.DesiredNumberScheduled = int32(desiredNumberScheduled)
 | |
| 		toUpdate.Status.CurrentNumberScheduled = int32(currentNumberScheduled)
 | |
| 		toUpdate.Status.NumberMisscheduled = int32(numberMisscheduled)
 | |
| 		toUpdate.Status.NumberReady = int32(numberReady)
 | |
| 		toUpdate.Status.UpdatedNumberScheduled = int32(updatedNumberScheduled)
 | |
| 		toUpdate.Status.NumberAvailable = int32(numberAvailable)
 | |
| 		toUpdate.Status.NumberUnavailable = int32(numberUnavailable)
 | |
| 
 | |
| 		if _, updateErr = dsClient.UpdateStatus(toUpdate); updateErr == nil {
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		// Update the set with the latest resource version for the next poll
 | |
| 		if toUpdate, getErr = dsClient.Get(ds.Name, metav1.GetOptions{}); getErr != nil {
 | |
| 			// If the GET fails we can't trust status.Replicas anymore. This error
 | |
| 			// is bound to be more interesting than the update failure.
 | |
| 			return getErr
 | |
| 		}
 | |
| 	}
 | |
| 	return updateErr
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, hash string, updateObservedGen bool) error {
 | |
| 	klog.V(4).Infof("Updating daemon set status")
 | |
| 	nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
 | |
| 	}
 | |
| 
 | |
| 	nodeList, err := dsc.nodeLister.List(labels.Everything())
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("couldn't get list of nodes when updating daemon set %#v: %v", ds, err)
 | |
| 	}
 | |
| 
 | |
| 	var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int
 | |
| 	for _, node := range nodeList {
 | |
| 		wantToRun, _, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		scheduled := len(nodeToDaemonPods[node.Name]) > 0
 | |
| 
 | |
| 		if wantToRun {
 | |
| 			desiredNumberScheduled++
 | |
| 			if scheduled {
 | |
| 				currentNumberScheduled++
 | |
| 				// Sort the daemon pods by creation time, so that the oldest is first.
 | |
| 				daemonPods, _ := nodeToDaemonPods[node.Name]
 | |
| 				sort.Sort(podByCreationTimestampAndPhase(daemonPods))
 | |
| 				pod := daemonPods[0]
 | |
| 				if podutil.IsPodReady(pod) {
 | |
| 					numberReady++
 | |
| 					if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) {
 | |
| 						numberAvailable++
 | |
| 					}
 | |
| 				}
 | |
| 				// If the returned error is not nil we have a parse error.
 | |
| 				// The controller handles this via the hash.
 | |
| 				generation, err := util.GetTemplateGeneration(ds)
 | |
| 				if err != nil {
 | |
| 					generation = nil
 | |
| 				}
 | |
| 				if util.IsPodUpdated(pod, hash, generation) {
 | |
| 					updatedNumberScheduled++
 | |
| 				}
 | |
| 			}
 | |
| 		} else {
 | |
| 			if scheduled {
 | |
| 				numberMisscheduled++
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	numberUnavailable := desiredNumberScheduled - numberAvailable
 | |
| 
 | |
| 	err = storeDaemonSetStatus(dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
 | |
| 	startTime := time.Now()
 | |
| 	defer func() {
 | |
| 		klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Since(startTime))
 | |
| 	}()
 | |
| 
 | |
| 	namespace, name, err := cache.SplitMetaNamespaceKey(key)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
 | |
| 	if errors.IsNotFound(err) {
 | |
| 		klog.V(3).Infof("daemon set has been deleted %v", key)
 | |
| 		dsc.expectations.DeleteExpectations(key)
 | |
| 		return nil
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
 | |
| 	}
 | |
| 
 | |
| 	everything := metav1.LabelSelector{}
 | |
| 	if reflect.DeepEqual(ds.Spec.Selector, &everything) {
 | |
| 		dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Don't process a daemon set until all its creations and deletions have been processed.
 | |
| 	// For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
 | |
| 	// then we do not want to call manage on foo until the daemon pods have been created.
 | |
| 	dsKey, err := controller.KeyFunc(ds)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
 | |
| 	}
 | |
| 
 | |
| 	// If the DaemonSet is being deleted (either by foreground deletion or
 | |
| 	// orphan deletion), we cannot be sure if the DaemonSet history objects
 | |
| 	// it owned still exist -- those history objects can either be deleted
 | |
| 	// or orphaned. Garbage collector doesn't guarantee that it will delete
 | |
| 	// DaemonSet pods before deleting DaemonSet history objects, because
 | |
| 	// DaemonSet history doesn't own DaemonSet pods. We cannot reliably
 | |
| 	// calculate the status of a DaemonSet being deleted. Therefore, return
 | |
| 	// here without updating status for the DaemonSet being deleted.
 | |
| 	if ds.DeletionTimestamp != nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Construct histories of the DaemonSet, and get the hash of current history
 | |
| 	cur, old, err := dsc.constructHistory(ds)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
 | |
| 	}
 | |
| 	hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
 | |
| 
 | |
| 	if !dsc.expectations.SatisfiedExpectations(dsKey) {
 | |
| 		// Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
 | |
| 		return dsc.updateDaemonSetStatus(ds, hash, false)
 | |
| 	}
 | |
| 
 | |
| 	err = dsc.manage(ds, hash)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Process rolling updates if we're ready.
 | |
| 	if dsc.expectations.SatisfiedExpectations(dsKey) {
 | |
| 		switch ds.Spec.UpdateStrategy.Type {
 | |
| 		case apps.OnDeleteDaemonSetStrategyType:
 | |
| 		case apps.RollingUpdateDaemonSetStrategyType:
 | |
| 			err = dsc.rollingUpdate(ds, hash)
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	err = dsc.cleanupHistory(ds, old)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	return dsc.updateDaemonSetStatus(ds, hash, true)
 | |
| }
 | |
| 
 | |
| func (dsc *DaemonSetsController) simulate(newPod *v1.Pod, node *v1.Node, ds *apps.DaemonSet) ([]algorithm.PredicateFailureReason, *schedulernodeinfo.NodeInfo, error) {
 | |
| 	objects, err := dsc.podNodeIndex.ByIndex("nodeName", node.Name)
 | |
| 	if err != nil {
 | |
| 		return nil, nil, err
 | |
| 	}
 | |
| 
 | |
| 	nodeInfo := schedulernodeinfo.NewNodeInfo()
 | |
| 	nodeInfo.SetNode(node)
 | |
| 
 | |
| 	for _, obj := range objects {
 | |
| 		// Ignore pods that belong to the daemonset when taking into account whether a daemonset should bind to a node.
 | |
| 		pod, ok := obj.(*v1.Pod)
 | |
| 		if !ok {
 | |
| 			continue
 | |
| 		}
 | |
| 		if metav1.IsControlledBy(pod, ds) {
 | |
| 			continue
 | |
| 		}
 | |
| 		nodeInfo.AddPod(pod)
 | |
| 	}
 | |
| 
 | |
| 	_, reasons, err := Predicates(newPod, nodeInfo)
 | |
| 	return reasons, nodeInfo, err
 | |
| }
 | |
| 
 | |
| // nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a
 | |
| // summary. Returned booleans are:
 | |
| // * wantToRun:
 | |
| //     Returns true when a user would expect a pod to run on this node and ignores conditions
 | |
| //     such as DiskPressure or insufficient resource that would cause a daemonset pod not to schedule.
 | |
| //     This is primarily used to populate daemonset status.
 | |
| // * shouldSchedule:
 | |
| //     Returns true when a daemonset should be scheduled to a node if a daemonset pod is not already
 | |
| //     running on that node.
 | |
| // * shouldContinueRunning:
 | |
| //     Returns true when a daemonset should continue running on a node if a daemonset pod is already
 | |
| //     running on that node.
 | |
| func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) {
 | |
| 	newPod := NewPod(ds, node.Name)
 | |
| 
 | |
| 	// Because these bools require an && of all their required conditions, we start
 | |
| 	// with all bools set to true and set a bool to false if a condition is not met.
 | |
| 	// A bool should probably not be set to true after this line.
 | |
| 	wantToRun, shouldSchedule, shouldContinueRunning = true, true, true
 | |
| 	// If the daemon set specifies a node name, check that it matches with node.Name.
 | |
| 	if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
 | |
| 		return false, false, false, nil
 | |
| 	}
 | |
| 
 | |
| 	reasons, nodeInfo, err := dsc.simulate(newPod, node, ds)
 | |
| 	if err != nil {
 | |
| 		klog.Warningf("DaemonSet Predicates failed on node %s for ds '%s/%s' due to unexpected error: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, err)
 | |
| 		return false, false, false, err
 | |
| 	}
 | |
| 
 | |
| 	// TODO(k82cn): When 'ScheduleDaemonSetPods' upgrade to beta or GA, remove unnecessary check on failure reason,
 | |
| 	//              e.g. InsufficientResourceError; and simplify "wantToRun, shouldSchedule, shouldContinueRunning"
 | |
| 	//              into one result, e.g. selectedNode.
 | |
| 	var insufficientResourceErr error
 | |
| 	for _, r := range reasons {
 | |
| 		klog.V(4).Infof("DaemonSet Predicates failed on node %s for ds '%s/%s' for reason: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason())
 | |
| 		switch reason := r.(type) {
 | |
| 		case *predicates.InsufficientResourceError:
 | |
| 			insufficientResourceErr = reason
 | |
| 		case *predicates.PredicateFailureError:
 | |
| 			var emitEvent bool
 | |
| 			// we try to partition predicates into two partitions here: intentional on the part of the operator and not.
 | |
| 			switch reason {
 | |
| 			// intentional
 | |
| 			case
 | |
| 				predicates.ErrNodeSelectorNotMatch,
 | |
| 				predicates.ErrPodNotMatchHostName,
 | |
| 				predicates.ErrNodeLabelPresenceViolated,
 | |
| 				// this one is probably intentional since it's a workaround for not having
 | |
| 				// pod hard anti affinity.
 | |
| 				predicates.ErrPodNotFitsHostPorts:
 | |
| 				return false, false, false, nil
 | |
| 			case predicates.ErrTaintsTolerationsNotMatch:
 | |
| 				// DaemonSet is expected to respect taints and tolerations
 | |
| 				fitsNoExecute, _, err := predicates.PodToleratesNodeNoExecuteTaints(newPod, nil, nodeInfo)
 | |
| 				if err != nil {
 | |
| 					return false, false, false, err
 | |
| 				}
 | |
| 				if !fitsNoExecute {
 | |
| 					return false, false, false, nil
 | |
| 				}
 | |
| 				wantToRun, shouldSchedule = false, false
 | |
| 			// unintentional
 | |
| 			case
 | |
| 				predicates.ErrDiskConflict,
 | |
| 				predicates.ErrVolumeZoneConflict,
 | |
| 				predicates.ErrMaxVolumeCountExceeded,
 | |
| 				predicates.ErrNodeUnderMemoryPressure,
 | |
| 				predicates.ErrNodeUnderDiskPressure:
 | |
| 				// wantToRun and shouldContinueRunning are likely true here. They are
 | |
| 				// absolutely true at the time of writing the comment. See first comment
 | |
| 				// of this method.
 | |
| 				shouldSchedule = false
 | |
| 				emitEvent = true
 | |
| 			// unexpected
 | |
| 			case
 | |
| 				predicates.ErrPodAffinityNotMatch,
 | |
| 				predicates.ErrServiceAffinityViolated:
 | |
| 				klog.Warningf("unexpected predicate failure reason: %s", reason.GetReason())
 | |
| 				return false, false, false, fmt.Errorf("unexpected reason: DaemonSet Predicates should not return reason %s", reason.GetReason())
 | |
| 			default:
 | |
| 				klog.V(4).Infof("unknown predicate failure reason: %s", reason.GetReason())
 | |
| 				wantToRun, shouldSchedule, shouldContinueRunning = false, false, false
 | |
| 				emitEvent = true
 | |
| 			}
 | |
| 			if emitEvent {
 | |
| 				dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, reason.GetReason())
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	// only emit this event if insufficient resource is the only thing
 | |
| 	// preventing the daemon pod from scheduling
 | |
| 	if shouldSchedule && insufficientResourceErr != nil {
 | |
| 		dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, insufficientResourceErr.Error())
 | |
| 		shouldSchedule = false
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // NewPod creates a new pod
 | |
| func NewPod(ds *apps.DaemonSet, nodeName string) *v1.Pod {
 | |
| 	newPod := &v1.Pod{Spec: ds.Spec.Template.Spec, ObjectMeta: ds.Spec.Template.ObjectMeta}
 | |
| 	newPod.Namespace = ds.Namespace
 | |
| 	newPod.Spec.NodeName = nodeName
 | |
| 
 | |
| 	// Added default tolerations for DaemonSet pods.
 | |
| 	util.AddOrUpdateDaemonPodTolerations(&newPod.Spec)
 | |
| 
 | |
| 	return newPod
 | |
| }
 | |
| 
 | |
| // checkNodeFitness runs a set of predicates that select candidate nodes for the DaemonSet;
 | |
| // the predicates include:
 | |
| //   - PodFitsHost: checks pod's NodeName against node
 | |
| //   - PodMatchNodeSelector: checks pod's NodeSelector and NodeAffinity against node
 | |
| //   - PodToleratesNodeTaints: exclude tainted node unless pod has specific toleration
 | |
| func checkNodeFitness(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
 | |
| 	var predicateFails []algorithm.PredicateFailureReason
 | |
| 	fit, reasons, err := predicates.PodFitsHost(pod, meta, nodeInfo)
 | |
| 	if err != nil {
 | |
| 		return false, predicateFails, err
 | |
| 	}
 | |
| 	if !fit {
 | |
| 		predicateFails = append(predicateFails, reasons...)
 | |
| 	}
 | |
| 
 | |
| 	fit, reasons, err = predicates.PodMatchNodeSelector(pod, meta, nodeInfo)
 | |
| 	if err != nil {
 | |
| 		return false, predicateFails, err
 | |
| 	}
 | |
| 	if !fit {
 | |
| 		predicateFails = append(predicateFails, reasons...)
 | |
| 	}
 | |
| 
 | |
| 	fit, reasons, err = predicates.PodToleratesNodeTaints(pod, nil, nodeInfo)
 | |
| 	if err != nil {
 | |
| 		return false, predicateFails, err
 | |
| 	}
 | |
| 	if !fit {
 | |
| 		predicateFails = append(predicateFails, reasons...)
 | |
| 	}
 | |
| 	return len(predicateFails) == 0, predicateFails, nil
 | |
| }
 | |
| 
 | |
| // Predicates checks if a DaemonSet's pod can be scheduled on a node using GeneralPredicates
 | |
| // and PodToleratesNodeTaints predicate
 | |
| func Predicates(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
 | |
| 	var predicateFails []algorithm.PredicateFailureReason
 | |
| 
 | |
| 	// If ScheduleDaemonSetPods is enabled, only check nodeSelector, nodeAffinity and toleration/taint match.
 | |
| 	if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
 | |
| 		fit, reasons, err := checkNodeFitness(pod, nil, nodeInfo)
 | |
| 		if err != nil {
 | |
| 			return false, predicateFails, err
 | |
| 		}
 | |
| 		if !fit {
 | |
| 			predicateFails = append(predicateFails, reasons...)
 | |
| 		}
 | |
| 
 | |
| 		return len(predicateFails) == 0, predicateFails, nil
 | |
| 	}
 | |
| 
 | |
| 	critical := kubelettypes.IsCriticalPod(pod)
 | |
| 
 | |
| 	fit, reasons, err := predicates.PodToleratesNodeTaints(pod, nil, nodeInfo)
 | |
| 	if err != nil {
 | |
| 		return false, predicateFails, err
 | |
| 	}
 | |
| 	if !fit {
 | |
| 		predicateFails = append(predicateFails, reasons...)
 | |
| 	}
 | |
| 	if critical {
 | |
| 		// If the pod is marked as critical and support for critical pod annotations is enabled,
 | |
| 		// check predicates for critical pods only.
 | |
| 		fit, reasons, err = predicates.EssentialPredicates(pod, nil, nodeInfo)
 | |
| 	} else {
 | |
| 		fit, reasons, err = predicates.GeneralPredicates(pod, nil, nodeInfo)
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return false, predicateFails, err
 | |
| 	}
 | |
| 	if !fit {
 | |
| 		predicateFails = append(predicateFails, reasons...)
 | |
| 	}
 | |
| 
 | |
| 	return len(predicateFails) == 0, predicateFails, nil
 | |
| }
 | |
| 
 | |
| type podByCreationTimestampAndPhase []*v1.Pod
 | |
| 
 | |
| func (o podByCreationTimestampAndPhase) Len() int      { return len(o) }
 | |
| func (o podByCreationTimestampAndPhase) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
 | |
| 
 | |
| func (o podByCreationTimestampAndPhase) Less(i, j int) bool {
 | |
| 	// Scheduled Pod first
 | |
| 	if len(o[i].Spec.NodeName) != 0 && len(o[j].Spec.NodeName) == 0 {
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	if len(o[i].Spec.NodeName) == 0 && len(o[j].Spec.NodeName) != 0 {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
 | |
| 		return o[i].Name < o[j].Name
 | |
| 	}
 | |
| 	return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
 | |
| }
 | |
| 
 | |
| func failedPodsBackoffKey(ds *apps.DaemonSet, nodeName string) string {
 | |
| 	return fmt.Sprintf("%s/%d/%s", ds.UID, ds.Status.ObservedGeneration, nodeName)
 | |
| }
 |