diff --git a/pkg/controller/daemon/BUILD b/pkg/controller/daemon/BUILD index 372d7e3598f..2bdff35dd16 100644 --- a/pkg/controller/daemon/BUILD +++ b/pkg/controller/daemon/BUILD @@ -32,7 +32,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index 7a0a92d4790..fd2b93f4f6e 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -33,7 +33,6 @@ import ( "k8s.io/apimachinery/pkg/labels" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" appsinformers "k8s.io/client-go/informers/apps/v1" coreinformers "k8s.io/client-go/informers/core/v1" @@ -96,8 +95,7 @@ type DaemonSetsController struct { // To allow injection of syncDaemonSet for testing. syncHandler func(dsKey string) error // used for unit testing - enqueueDaemonSet func(ds *apps.DaemonSet) - enqueueDaemonSetRateLimited func(ds *apps.DaemonSet) + enqueueDaemonSet func(ds *apps.DaemonSet) // A TTLCache of pod creates/deletes each ds expects to see expectations controller.ControllerExpectationsInterface // dsLister can list/get daemonsets from the shared informer's store @@ -126,11 +124,6 @@ type DaemonSetsController struct { // DaemonSet keys that need to be synced. queue workqueue.RateLimitingInterface - // The DaemonSet that has suspended pods on nodes; the key is node name, the value - // is DaemonSet set that want to run pods but can't schedule in latest syncup cycle. - suspendedDaemonPodsMutex sync.Mutex - suspendedDaemonPods map[string]sets.String - failedPodsBackoff *flowcontrol.Backoff } @@ -162,10 +155,9 @@ func NewDaemonSetsController( crControl: controller.RealControllerRevisionControl{ KubeClient: kubeClient, }, - burstReplicas: BurstReplicas, - expectations: controller.NewControllerExpectations(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"), - suspendedDaemonPods: map[string]sets.String{}, + burstReplicas: BurstReplicas, + expectations: controller.NewControllerExpectations(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"), } daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -219,7 +211,6 @@ func NewDaemonSetsController( dsc.syncHandler = dsc.syncDaemonSet dsc.enqueueDaemonSet = dsc.enqueue - dsc.enqueueDaemonSetRateLimited = dsc.enqueueRateLimited dsc.failedPodsBackoff = failedPodsBackoff @@ -584,67 +575,6 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) { } } -// listSuspendedDaemonPods lists the Daemon pods that 'want to run, but should not schedule' -// for the node. -func (dsc *DaemonSetsController) listSuspendedDaemonPods(node string) (dss []string) { - dsc.suspendedDaemonPodsMutex.Lock() - defer dsc.suspendedDaemonPodsMutex.Unlock() - - if _, found := dsc.suspendedDaemonPods[node]; !found { - return nil - } - - for k := range dsc.suspendedDaemonPods[node] { - dss = append(dss, k) - } - return -} - -// requeueSuspendedDaemonPods enqueues all DaemonSets which has pods that 'want to run, -// but should not schedule' for the node; so DaemonSetController will sync up them again. -func (dsc *DaemonSetsController) requeueSuspendedDaemonPods(node string) { - dss := dsc.listSuspendedDaemonPods(node) - for _, dsKey := range dss { - if ns, name, err := cache.SplitMetaNamespaceKey(dsKey); err != nil { - klog.Errorf("Failed to get DaemonSet's namespace and name from %s: %v", dsKey, err) - continue - } else if ds, err := dsc.dsLister.DaemonSets(ns).Get(name); err != nil { - klog.Errorf("Failed to get DaemonSet %s/%s: %v", ns, name, err) - continue - } else { - dsc.enqueueDaemonSetRateLimited(ds) - } - } -} - -// addSuspendedDaemonPods adds DaemonSet which has pods that 'want to run, -// but should not schedule' for the node to the suspended queue. -func (dsc *DaemonSetsController) addSuspendedDaemonPods(node, ds string) { - dsc.suspendedDaemonPodsMutex.Lock() - defer dsc.suspendedDaemonPodsMutex.Unlock() - - if _, found := dsc.suspendedDaemonPods[node]; !found { - dsc.suspendedDaemonPods[node] = sets.NewString() - } - dsc.suspendedDaemonPods[node].Insert(ds) -} - -// removeSuspendedDaemonPods removes DaemonSet which has pods that 'want to run, -// but should not schedule' for the node from suspended queue. -func (dsc *DaemonSetsController) removeSuspendedDaemonPods(node, ds string) { - dsc.suspendedDaemonPodsMutex.Lock() - defer dsc.suspendedDaemonPodsMutex.Unlock() - - if _, found := dsc.suspendedDaemonPods[node]; !found { - return - } - dsc.suspendedDaemonPods[node].Delete(ds) - - if len(dsc.suspendedDaemonPods[node]) == 0 { - delete(dsc.suspendedDaemonPods, node) - } -} - func (dsc *DaemonSetsController) deletePod(obj interface{}) { pod, ok := obj.(*v1.Pod) // When a delete is dropped, the relist will notice a pod in the store not @@ -668,18 +598,10 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) { controllerRef := metav1.GetControllerOf(pod) if controllerRef == nil { // No controller should care about orphans being deleted. - if len(pod.Spec.NodeName) != 0 { - // If scheduled pods were deleted, requeue suspended daemon pods. - dsc.requeueSuspendedDaemonPods(pod.Spec.NodeName) - } return } ds := dsc.resolveControllerRef(pod.Namespace, controllerRef) if ds == nil { - if len(pod.Spec.NodeName) != 0 { - // If scheduled pods were deleted, requeue suspended daemon pods. - dsc.requeueSuspendedDaemonPods(pod.Spec.NodeName) - } return } dsKey, err := controller.KeyFunc(ds) @@ -866,24 +788,14 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode( ds *apps.DaemonSet, ) (nodesNeedingDaemonPods, podsToDelete []string, err error) { - wantToRun, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds) + _, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds) if err != nil { return } daemonPods, exists := nodeToDaemonPods[node.Name] - dsKey, err := cache.MetaNamespaceKeyFunc(ds) - if err != nil { - utilruntime.HandleError(err) - return - } - - dsc.removeSuspendedDaemonPods(node.Name, dsKey) switch { - case wantToRun && !shouldSchedule: - // If daemon pod is supposed to run, but can not be scheduled, add to suspended list. - dsc.addSuspendedDaemonPods(node.Name, dsKey) case shouldSchedule && !exists: // If daemon pod is supposed to be running on node, but isn't, create daemon pod. nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name) diff --git a/pkg/controller/daemon/daemon_controller_test.go b/pkg/controller/daemon/daemon_controller_test.go index 3108c4542ce..913e32d4fc2 100644 --- a/pkg/controller/daemon/daemon_controller_test.go +++ b/pkg/controller/daemon/daemon_controller_test.go @@ -1974,7 +1974,6 @@ func TestUpdateNode(t *testing.T) { // DaemonSets should be resynced when non-daemon pods was deleted. func TestDeleteNoDaemonPod(t *testing.T) { - var enqueued bool cases := []struct { @@ -2132,12 +2131,6 @@ func TestDeleteNoDaemonPod(t *testing.T) { t.Fatalf("unexpected UpdateStrategy %+v", strategy) } - manager.enqueueDaemonSetRateLimited = func(ds *apps.DaemonSet) { - if ds.Name == "ds" { - enqueued = true - } - } - enqueued = false manager.deletePod(c.deletedPod) if enqueued != c.shouldEnqueue {