mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-02 01:39:02 +00:00
daemon: add custom node indexer
This commit is contained in:
@@ -113,6 +113,8 @@ type DaemonSetsController struct {
|
|||||||
historyStoreSynced cache.InformerSynced
|
historyStoreSynced cache.InformerSynced
|
||||||
// podLister get list/get pods from the shared informers's store
|
// podLister get list/get pods from the shared informers's store
|
||||||
podLister corelisters.PodLister
|
podLister corelisters.PodLister
|
||||||
|
// podNodeIndex indexes pods by their nodeName
|
||||||
|
podNodeIndex cache.Indexer
|
||||||
// podStoreSynced returns true if the pod store has been synced at least once.
|
// podStoreSynced returns true if the pod store has been synced at least once.
|
||||||
// Added as a member to the struct to allow injection for testing.
|
// Added as a member to the struct to allow injection for testing.
|
||||||
podStoreSynced cache.InformerSynced
|
podStoreSynced cache.InformerSynced
|
||||||
@@ -191,6 +193,12 @@ func NewDaemonSetsController(daemonSetInformer appsinformers.DaemonSetInformer,
|
|||||||
DeleteFunc: dsc.deletePod,
|
DeleteFunc: dsc.deletePod,
|
||||||
})
|
})
|
||||||
dsc.podLister = podInformer.Lister()
|
dsc.podLister = podInformer.Lister()
|
||||||
|
|
||||||
|
// This custom indexer will index pods based on their NodeName which will decrease the amount of pods we need to get in simulate() call.
|
||||||
|
podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
|
||||||
|
"nodeName": indexByPodNodeName,
|
||||||
|
})
|
||||||
|
dsc.podNodeIndex = podInformer.Informer().GetIndexer()
|
||||||
dsc.podStoreSynced = podInformer.Informer().HasSynced
|
dsc.podStoreSynced = podInformer.Informer().HasSynced
|
||||||
|
|
||||||
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
@@ -207,6 +215,18 @@ func NewDaemonSetsController(daemonSetInformer appsinformers.DaemonSetInformer,
|
|||||||
return dsc, nil
|
return dsc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func indexByPodNodeName(obj interface{}) ([]string, error) {
|
||||||
|
pod, ok := obj.(*v1.Pod)
|
||||||
|
if !ok {
|
||||||
|
return []string{}, nil
|
||||||
|
}
|
||||||
|
// We are only interested in active pods with nodeName set
|
||||||
|
if len(pod.Spec.NodeName) == 0 || pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
|
||||||
|
return []string{}, nil
|
||||||
|
}
|
||||||
|
return []string{pod.Spec.NodeName}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
|
func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
|
||||||
ds, ok := obj.(*apps.DaemonSet)
|
ds, ok := obj.(*apps.DaemonSet)
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -1272,31 +1292,27 @@ func (dsc *DaemonSetsController) simulate(newPod *v1.Pod, node *v1.Node, ds *app
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pods := []*v1.Pod{}
|
objects, err := dsc.podNodeIndex.ByIndex("nodeName", node.Name)
|
||||||
|
|
||||||
podList, err := dsc.podLister.List(labels.Everything())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
for _, pod := range podList {
|
|
||||||
if pod.Spec.NodeName != node.Name {
|
nodeInfo := schedulercache.NewNodeInfo()
|
||||||
continue
|
nodeInfo.SetNode(node)
|
||||||
}
|
|
||||||
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
|
for _, obj := range objects {
|
||||||
continue
|
// Ignore pods that belong to the daemonset when taking into account whether a daemonset should bind to a node.
|
||||||
}
|
|
||||||
// ignore pods that belong to the daemonset when taking into account whether
|
|
||||||
// a daemonset should bind to a node.
|
|
||||||
// TODO: replace this with metav1.IsControlledBy() in 1.12
|
// TODO: replace this with metav1.IsControlledBy() in 1.12
|
||||||
|
pod, ok := obj.(*v1.Pod)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if isControlledByDaemonSet(pod, ds.GetUID()) {
|
if isControlledByDaemonSet(pod, ds.GetUID()) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
pods = append(pods, pod)
|
nodeInfo.AddPod(pod)
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeInfo := schedulercache.NewNodeInfo(pods...)
|
|
||||||
nodeInfo.SetNode(node)
|
|
||||||
|
|
||||||
_, reasons, err := Predicates(newPod, nodeInfo)
|
_, reasons, err := Predicates(newPod, nodeInfo)
|
||||||
return reasons, nodeInfo, err
|
return reasons, nodeInfo, err
|
||||||
}
|
}
|
||||||
|
@@ -1953,6 +1953,7 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
|
|||||||
for _, p := range c.podsOnNode {
|
for _, p := range c.podsOnNode {
|
||||||
manager.podStore.Add(p)
|
manager.podStore.Add(p)
|
||||||
p.Spec.NodeName = "test-node"
|
p.Spec.NodeName = "test-node"
|
||||||
|
manager.podNodeIndex.Add(p)
|
||||||
}
|
}
|
||||||
c.ds.Spec.UpdateStrategy = *strategy
|
c.ds.Spec.UpdateStrategy = *strategy
|
||||||
wantToRun, shouldSchedule, shouldContinueRunning, err := manager.nodeShouldRunDaemonPod(node, c.ds)
|
wantToRun, shouldSchedule, shouldContinueRunning, err := manager.nodeShouldRunDaemonPod(node, c.ds)
|
||||||
|
Reference in New Issue
Block a user