mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 22:46:12 +00:00
Merge pull request #65009 from mfojtik/ds-02-add-node-indexer
Automatic merge from submit-queue (batch tested with PRs 64974, 65009, 65018). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. daemon: add custom node indexer **What this PR does / why we need it**: <img width="863" alt="screen shot 2018-06-11 at 20 54 03" src="https://user-images.githubusercontent.com/44136/41279030-ad842020-6e2b-11e8-80d4-0a71ee415d30.png"> Based on this CPU profile, it looks like a lot of CPU cycles/cores are spend by retrieving a list of **all** pods in the cluster. On large clusters with multiple daemonset this might lead to locking the shared pod informer List() of every other controller that might need it or use it. The indexer in the PR will index the pods based on nodeName assigned for these pods. That means the amount of pods returned from the ByIndex() function is fairly small and the call should be fast. Additionally we can also use this index to check whether a node already run the pod without listing all pods in the cluster again. **Special notes for your reviewer**: **Release note**: ```release-note NONE ```
This commit is contained in:
commit
e7bdebd5f1
@ -113,6 +113,8 @@ type DaemonSetsController struct {
|
||||
historyStoreSynced cache.InformerSynced
|
||||
// podLister get list/get pods from the shared informers's store
|
||||
podLister corelisters.PodLister
|
||||
// podNodeIndex indexes pods by their nodeName
|
||||
podNodeIndex cache.Indexer
|
||||
// podStoreSynced returns true if the pod store has been synced at least once.
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
podStoreSynced cache.InformerSynced
|
||||
@ -191,6 +193,12 @@ func NewDaemonSetsController(daemonSetInformer appsinformers.DaemonSetInformer,
|
||||
DeleteFunc: dsc.deletePod,
|
||||
})
|
||||
dsc.podLister = podInformer.Lister()
|
||||
|
||||
// This custom indexer will index pods based on their NodeName which will decrease the amount of pods we need to get in simulate() call.
|
||||
podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
|
||||
"nodeName": indexByPodNodeName,
|
||||
})
|
||||
dsc.podNodeIndex = podInformer.Informer().GetIndexer()
|
||||
dsc.podStoreSynced = podInformer.Informer().HasSynced
|
||||
|
||||
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
@ -207,6 +215,18 @@ func NewDaemonSetsController(daemonSetInformer appsinformers.DaemonSetInformer,
|
||||
return dsc, nil
|
||||
}
|
||||
|
||||
func indexByPodNodeName(obj interface{}) ([]string, error) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
return []string{}, nil
|
||||
}
|
||||
// We are only interested in active pods with nodeName set
|
||||
if len(pod.Spec.NodeName) == 0 || pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
|
||||
return []string{}, nil
|
||||
}
|
||||
return []string{pod.Spec.NodeName}, nil
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
|
||||
ds, ok := obj.(*apps.DaemonSet)
|
||||
if !ok {
|
||||
@ -1272,31 +1292,27 @@ func (dsc *DaemonSetsController) simulate(newPod *v1.Pod, node *v1.Node, ds *app
|
||||
})
|
||||
}
|
||||
|
||||
pods := []*v1.Pod{}
|
||||
|
||||
podList, err := dsc.podLister.List(labels.Everything())
|
||||
objects, err := dsc.podNodeIndex.ByIndex("nodeName", node.Name)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
for _, pod := range podList {
|
||||
if pod.Spec.NodeName != node.Name {
|
||||
continue
|
||||
}
|
||||
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
|
||||
continue
|
||||
}
|
||||
// ignore pods that belong to the daemonset when taking into account whether
|
||||
// a daemonset should bind to a node.
|
||||
|
||||
nodeInfo := schedulercache.NewNodeInfo()
|
||||
nodeInfo.SetNode(node)
|
||||
|
||||
for _, obj := range objects {
|
||||
// Ignore pods that belong to the daemonset when taking into account whether a daemonset should bind to a node.
|
||||
// TODO: replace this with metav1.IsControlledBy() in 1.12
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if isControlledByDaemonSet(pod, ds.GetUID()) {
|
||||
continue
|
||||
}
|
||||
pods = append(pods, pod)
|
||||
nodeInfo.AddPod(pod)
|
||||
}
|
||||
|
||||
nodeInfo := schedulercache.NewNodeInfo(pods...)
|
||||
nodeInfo.SetNode(node)
|
||||
|
||||
_, reasons, err := Predicates(newPod, nodeInfo)
|
||||
return reasons, nodeInfo, err
|
||||
}
|
||||
|
@ -1953,6 +1953,7 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
|
||||
for _, p := range c.podsOnNode {
|
||||
manager.podStore.Add(p)
|
||||
p.Spec.NodeName = "test-node"
|
||||
manager.podNodeIndex.Add(p)
|
||||
}
|
||||
c.ds.Spec.UpdateStrategy = *strategy
|
||||
wantToRun, shouldSchedule, shouldContinueRunning, err := manager.nodeShouldRunDaemonPod(node, c.ds)
|
||||
|
Loading…
Reference in New Issue
Block a user