Merge pull request #84323 from draveness/feature/remove-suspended-daemon-pods

feat: remove suspendedDaemonPods from daemon controller
This commit is contained in:
Kubernetes Prow Robot 2019-11-02 23:41:40 -07:00 committed by GitHub
commit d8ab3f26e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 5 additions and 101 deletions

View File

@ -32,7 +32,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",

View File

@ -33,7 +33,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
@ -97,7 +96,6 @@ type DaemonSetsController struct {
syncHandler func(dsKey string) error
// used for unit testing
enqueueDaemonSet func(ds *apps.DaemonSet)
enqueueDaemonSetRateLimited func(ds *apps.DaemonSet)
// A TTLCache of pod creates/deletes each ds expects to see
expectations controller.ControllerExpectationsInterface
// dsLister can list/get daemonsets from the shared informer's store
@ -126,11 +124,6 @@ type DaemonSetsController struct {
// DaemonSet keys that need to be synced.
queue workqueue.RateLimitingInterface
// The DaemonSet that has suspended pods on nodes; the key is node name, the value
// is DaemonSet set that want to run pods but can't schedule in latest syncup cycle.
suspendedDaemonPodsMutex sync.Mutex
suspendedDaemonPods map[string]sets.String
failedPodsBackoff *flowcontrol.Backoff
}
@ -165,7 +158,6 @@ func NewDaemonSetsController(
burstReplicas: BurstReplicas,
expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
suspendedDaemonPods: map[string]sets.String{},
}
daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -219,7 +211,6 @@ func NewDaemonSetsController(
dsc.syncHandler = dsc.syncDaemonSet
dsc.enqueueDaemonSet = dsc.enqueue
dsc.enqueueDaemonSetRateLimited = dsc.enqueueRateLimited
dsc.failedPodsBackoff = failedPodsBackoff
@ -584,67 +575,6 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
}
}
// listSuspendedDaemonPods lists the Daemon pods that 'want to run, but should not schedule'
// for the node.
func (dsc *DaemonSetsController) listSuspendedDaemonPods(node string) (dss []string) {
dsc.suspendedDaemonPodsMutex.Lock()
defer dsc.suspendedDaemonPodsMutex.Unlock()
if _, found := dsc.suspendedDaemonPods[node]; !found {
return nil
}
for k := range dsc.suspendedDaemonPods[node] {
dss = append(dss, k)
}
return
}
// requeueSuspendedDaemonPods enqueues all DaemonSets which has pods that 'want to run,
// but should not schedule' for the node; so DaemonSetController will sync up them again.
func (dsc *DaemonSetsController) requeueSuspendedDaemonPods(node string) {
dss := dsc.listSuspendedDaemonPods(node)
for _, dsKey := range dss {
if ns, name, err := cache.SplitMetaNamespaceKey(dsKey); err != nil {
klog.Errorf("Failed to get DaemonSet's namespace and name from %s: %v", dsKey, err)
continue
} else if ds, err := dsc.dsLister.DaemonSets(ns).Get(name); err != nil {
klog.Errorf("Failed to get DaemonSet %s/%s: %v", ns, name, err)
continue
} else {
dsc.enqueueDaemonSetRateLimited(ds)
}
}
}
// addSuspendedDaemonPods adds DaemonSet which has pods that 'want to run,
// but should not schedule' for the node to the suspended queue.
func (dsc *DaemonSetsController) addSuspendedDaemonPods(node, ds string) {
dsc.suspendedDaemonPodsMutex.Lock()
defer dsc.suspendedDaemonPodsMutex.Unlock()
if _, found := dsc.suspendedDaemonPods[node]; !found {
dsc.suspendedDaemonPods[node] = sets.NewString()
}
dsc.suspendedDaemonPods[node].Insert(ds)
}
// removeSuspendedDaemonPods removes DaemonSet which has pods that 'want to run,
// but should not schedule' for the node from suspended queue.
func (dsc *DaemonSetsController) removeSuspendedDaemonPods(node, ds string) {
dsc.suspendedDaemonPodsMutex.Lock()
defer dsc.suspendedDaemonPodsMutex.Unlock()
if _, found := dsc.suspendedDaemonPods[node]; !found {
return
}
dsc.suspendedDaemonPods[node].Delete(ds)
if len(dsc.suspendedDaemonPods[node]) == 0 {
delete(dsc.suspendedDaemonPods, node)
}
}
func (dsc *DaemonSetsController) deletePod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
// When a delete is dropped, the relist will notice a pod in the store not
@ -668,18 +598,10 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) {
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
// No controller should care about orphans being deleted.
if len(pod.Spec.NodeName) != 0 {
// If scheduled pods were deleted, requeue suspended daemon pods.
dsc.requeueSuspendedDaemonPods(pod.Spec.NodeName)
}
return
}
ds := dsc.resolveControllerRef(pod.Namespace, controllerRef)
if ds == nil {
if len(pod.Spec.NodeName) != 0 {
// If scheduled pods were deleted, requeue suspended daemon pods.
dsc.requeueSuspendedDaemonPods(pod.Spec.NodeName)
}
return
}
dsKey, err := controller.KeyFunc(ds)
@ -866,24 +788,14 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
ds *apps.DaemonSet,
) (nodesNeedingDaemonPods, podsToDelete []string, err error) {
wantToRun, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
_, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
return
}
daemonPods, exists := nodeToDaemonPods[node.Name]
dsKey, err := cache.MetaNamespaceKeyFunc(ds)
if err != nil {
utilruntime.HandleError(err)
return
}
dsc.removeSuspendedDaemonPods(node.Name, dsKey)
switch {
case wantToRun && !shouldSchedule:
// If daemon pod is supposed to run, but can not be scheduled, add to suspended list.
dsc.addSuspendedDaemonPods(node.Name, dsKey)
case shouldSchedule && !exists:
// If daemon pod is supposed to be running on node, but isn't, create daemon pod.
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)

View File

@ -1974,7 +1974,6 @@ func TestUpdateNode(t *testing.T) {
// DaemonSets should be resynced when non-daemon pods was deleted.
func TestDeleteNoDaemonPod(t *testing.T) {
var enqueued bool
cases := []struct {
@ -2132,12 +2131,6 @@ func TestDeleteNoDaemonPod(t *testing.T) {
t.Fatalf("unexpected UpdateStrategy %+v", strategy)
}
manager.enqueueDaemonSetRateLimited = func(ds *apps.DaemonSet) {
if ds.Name == "ds" {
enqueued = true
}
}
enqueued = false
manager.deletePod(c.deletedPod)
if enqueued != c.shouldEnqueue {