mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 01:40:07 +00:00
Avoid creating NodeInfoMap from scratch on every scheduling.
This commit is contained in:
parent
becb3b44e7
commit
49934c05c0
@ -62,6 +62,8 @@ type genericScheduler struct {
|
|||||||
pods algorithm.PodLister
|
pods algorithm.PodLister
|
||||||
lastNodeIndexLock sync.Mutex
|
lastNodeIndexLock sync.Mutex
|
||||||
lastNodeIndex uint64
|
lastNodeIndex uint64
|
||||||
|
|
||||||
|
cachedNodeInfoMap map[string]*schedulercache.NodeInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
// Schedule tries to schedule the given pod to one of node in the node list.
|
// Schedule tries to schedule the given pod to one of node in the node list.
|
||||||
@ -85,13 +87,13 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Used for all fit and priority funcs.
|
// Used for all fit and priority funcs.
|
||||||
nodeNameToInfo, err := g.cache.GetNodeNameToInfoMap()
|
err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
trace.Step("Computing predicates")
|
trace.Step("Computing predicates")
|
||||||
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, nodeNameToInfo, g.predicates, nodes, g.extenders)
|
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, g.predicates, nodes, g.extenders)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -104,7 +106,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
|
|||||||
}
|
}
|
||||||
|
|
||||||
trace.Step("Prioritizing")
|
trace.Step("Prioritizing")
|
||||||
priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders)
|
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -333,5 +335,6 @@ func NewGenericScheduler(cache schedulercache.Cache, predicates map[string]algor
|
|||||||
predicates: predicates,
|
predicates: predicates,
|
||||||
prioritizers: prioritizers,
|
prioritizers: prioritizers,
|
||||||
extenders: extenders,
|
extenders: extenders,
|
||||||
|
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -74,14 +74,20 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cache *schedulerCache) GetNodeNameToInfoMap() (map[string]*NodeInfo, error) {
|
func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error {
|
||||||
nodeNameToInfo := make(map[string]*NodeInfo)
|
|
||||||
cache.mu.Lock()
|
cache.mu.Lock()
|
||||||
defer cache.mu.Unlock()
|
defer cache.mu.Unlock()
|
||||||
for name, info := range cache.nodes {
|
for name, info := range cache.nodes {
|
||||||
|
if current, ok := nodeNameToInfo[name]; !ok || current.generation != info.generation {
|
||||||
nodeNameToInfo[name] = info.Clone()
|
nodeNameToInfo[name] = info.Clone()
|
||||||
}
|
}
|
||||||
return nodeNameToInfo, nil
|
}
|
||||||
|
for name := range nodeNameToInfo {
|
||||||
|
if _, ok := cache.nodes[name]; !ok {
|
||||||
|
delete(nodeNameToInfo, name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cache *schedulerCache) List(selector labels.Selector) ([]*api.Pod, error) {
|
func (cache *schedulerCache) List(selector labels.Selector) ([]*api.Pod, error) {
|
||||||
|
@ -28,6 +28,16 @@ import (
|
|||||||
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
|
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func deepEqualWithoutGeneration(t *testing.T, testcase int, actual, expected *NodeInfo) {
|
||||||
|
// Ignore generation field.
|
||||||
|
if actual != nil {
|
||||||
|
actual.generation = 0
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(actual, expected) {
|
||||||
|
t.Errorf("#%d: node info get=%s, want=%s", testcase, actual, expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated
|
// TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated
|
||||||
// on node level.
|
// on node level.
|
||||||
func TestAssumePodScheduled(t *testing.T) {
|
func TestAssumePodScheduled(t *testing.T) {
|
||||||
@ -92,9 +102,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
n := cache.nodes[nodeName]
|
n := cache.nodes[nodeName]
|
||||||
if !reflect.DeepEqual(n, tt.wNodeInfo) {
|
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
|
||||||
t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -154,9 +162,7 @@ func TestExpirePod(t *testing.T) {
|
|||||||
// pods that have assumedTime + ttl < cleanupTime will get expired and removed
|
// pods that have assumedTime + ttl < cleanupTime will get expired and removed
|
||||||
cache.cleanupAssumedPods(tt.cleanupTime)
|
cache.cleanupAssumedPods(tt.cleanupTime)
|
||||||
n := cache.nodes[nodeName]
|
n := cache.nodes[nodeName]
|
||||||
if !reflect.DeepEqual(n, tt.wNodeInfo) {
|
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
|
||||||
t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -207,9 +213,7 @@ func TestAddPodWillConfirm(t *testing.T) {
|
|||||||
cache.cleanupAssumedPods(now.Add(2 * ttl))
|
cache.cleanupAssumedPods(now.Add(2 * ttl))
|
||||||
// check after expiration. confirmed pods shouldn't be expired.
|
// check after expiration. confirmed pods shouldn't be expired.
|
||||||
n := cache.nodes[nodeName]
|
n := cache.nodes[nodeName]
|
||||||
if !reflect.DeepEqual(n, tt.wNodeInfo) {
|
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
|
||||||
t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -254,9 +258,7 @@ func TestAddPodAfterExpiration(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// check after expiration. confirmed pods shouldn't be expired.
|
// check after expiration. confirmed pods shouldn't be expired.
|
||||||
n = cache.nodes[nodeName]
|
n = cache.nodes[nodeName]
|
||||||
if !reflect.DeepEqual(n, tt.wNodeInfo) {
|
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
|
||||||
t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -317,9 +319,7 @@ func TestUpdatePod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// check after expiration. confirmed pods shouldn't be expired.
|
// check after expiration. confirmed pods shouldn't be expired.
|
||||||
n := cache.nodes[nodeName]
|
n := cache.nodes[nodeName]
|
||||||
if !reflect.DeepEqual(n, tt.wNodeInfo[i-1]) {
|
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo[i-1])
|
||||||
t.Errorf("#%d: node info get=%s, want=%s", i-1, n, tt.wNodeInfo)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -390,9 +390,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// check after expiration. confirmed pods shouldn't be expired.
|
// check after expiration. confirmed pods shouldn't be expired.
|
||||||
n := cache.nodes[nodeName]
|
n := cache.nodes[nodeName]
|
||||||
if !reflect.DeepEqual(n, tt.wNodeInfo[i-1]) {
|
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo[i-1])
|
||||||
t.Errorf("#%d: node info get=%s, want=%s", i-1, n, tt.wNodeInfo)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -426,9 +424,7 @@ func TestRemovePod(t *testing.T) {
|
|||||||
t.Fatalf("AddPod failed: %v", err)
|
t.Fatalf("AddPod failed: %v", err)
|
||||||
}
|
}
|
||||||
n := cache.nodes[nodeName]
|
n := cache.nodes[nodeName]
|
||||||
if !reflect.DeepEqual(n, tt.wNodeInfo) {
|
deepEqualWithoutGeneration(t, i, n, tt.wNodeInfo)
|
||||||
t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := cache.RemovePod(tt.pod); err != nil {
|
if err := cache.RemovePod(tt.pod); err != nil {
|
||||||
t.Fatalf("RemovePod failed: %v", err)
|
t.Fatalf("RemovePod failed: %v", err)
|
||||||
@ -441,14 +437,6 @@ func TestRemovePod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkGetNodeNameToInfoMap1kNodes30kPods(b *testing.B) {
|
|
||||||
cache := setupCacheOf1kNodes30kPods(b)
|
|
||||||
b.ResetTimer()
|
|
||||||
for n := 0; n < b.N; n++ {
|
|
||||||
cache.GetNodeNameToInfoMap()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func BenchmarkList1kNodes30kPods(b *testing.B) {
|
func BenchmarkList1kNodes30kPods(b *testing.B) {
|
||||||
cache := setupCacheOf1kNodes30kPods(b)
|
cache := setupCacheOf1kNodes30kPods(b)
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
@ -80,9 +80,10 @@ type Cache interface {
|
|||||||
// RemoveNode removes overall information about node.
|
// RemoveNode removes overall information about node.
|
||||||
RemoveNode(node *api.Node) error
|
RemoveNode(node *api.Node) error
|
||||||
|
|
||||||
// GetNodeNameToInfoMap returns a map of node names to node info. The node info contains
|
// UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache.
|
||||||
// aggregated information of pods scheduled (including assumed to be) on this node.
|
// The node info contains aggregated information of pods scheduled (including assumed to be)
|
||||||
GetNodeNameToInfoMap() (map[string]*NodeInfo, error)
|
// on this node.
|
||||||
|
UpdateNodeNameToInfoMap(infoMap map[string]*NodeInfo) error
|
||||||
|
|
||||||
// List lists all cached pods (including assumed ones).
|
// List lists all cached pods (including assumed ones).
|
||||||
List(labels.Selector) ([]*api.Pod, error)
|
List(labels.Selector) ([]*api.Pod, error)
|
||||||
|
@ -39,6 +39,10 @@ type NodeInfo struct {
|
|||||||
requestedResource *Resource
|
requestedResource *Resource
|
||||||
pods []*api.Pod
|
pods []*api.Pod
|
||||||
nonzeroRequest *Resource
|
nonzeroRequest *Resource
|
||||||
|
|
||||||
|
// Whenever NodeInfo changes, generation is bumped.
|
||||||
|
// This is used to avoid cloning it if the object didn't change.
|
||||||
|
generation int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resource is a collection of compute resource.
|
// Resource is a collection of compute resource.
|
||||||
@ -55,6 +59,7 @@ func NewNodeInfo(pods ...*api.Pod) *NodeInfo {
|
|||||||
ni := &NodeInfo{
|
ni := &NodeInfo{
|
||||||
requestedResource: &Resource{},
|
requestedResource: &Resource{},
|
||||||
nonzeroRequest: &Resource{},
|
nonzeroRequest: &Resource{},
|
||||||
|
generation: 0,
|
||||||
}
|
}
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
ni.addPod(pod)
|
ni.addPod(pod)
|
||||||
@ -101,6 +106,7 @@ func (n *NodeInfo) Clone() *NodeInfo {
|
|||||||
requestedResource: &(*n.requestedResource),
|
requestedResource: &(*n.requestedResource),
|
||||||
nonzeroRequest: &(*n.nonzeroRequest),
|
nonzeroRequest: &(*n.nonzeroRequest),
|
||||||
pods: pods,
|
pods: pods,
|
||||||
|
generation: n.generation,
|
||||||
}
|
}
|
||||||
return clone
|
return clone
|
||||||
}
|
}
|
||||||
@ -123,6 +129,7 @@ func (n *NodeInfo) addPod(pod *api.Pod) {
|
|||||||
n.nonzeroRequest.MilliCPU += non0_cpu
|
n.nonzeroRequest.MilliCPU += non0_cpu
|
||||||
n.nonzeroRequest.Memory += non0_mem
|
n.nonzeroRequest.Memory += non0_mem
|
||||||
n.pods = append(n.pods, pod)
|
n.pods = append(n.pods, pod)
|
||||||
|
n.generation++
|
||||||
}
|
}
|
||||||
|
|
||||||
// removePod subtracts pod information to this NodeInfo.
|
// removePod subtracts pod information to this NodeInfo.
|
||||||
@ -149,6 +156,7 @@ func (n *NodeInfo) removePod(pod *api.Pod) error {
|
|||||||
n.requestedResource.NvidiaGPU -= nvidia_gpu
|
n.requestedResource.NvidiaGPU -= nvidia_gpu
|
||||||
n.nonzeroRequest.MilliCPU -= non0_cpu
|
n.nonzeroRequest.MilliCPU -= non0_cpu
|
||||||
n.nonzeroRequest.Memory -= non0_mem
|
n.nonzeroRequest.Memory -= non0_mem
|
||||||
|
n.generation++
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -173,6 +181,7 @@ func calculateResource(pod *api.Pod) (cpu int64, mem int64, nvidia_gpu int64, no
|
|||||||
// Sets the overall node information.
|
// Sets the overall node information.
|
||||||
func (n *NodeInfo) SetNode(node *api.Node) error {
|
func (n *NodeInfo) SetNode(node *api.Node) error {
|
||||||
n.node = node
|
n.node = node
|
||||||
|
n.generation++
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -183,6 +192,7 @@ func (n *NodeInfo) RemoveNode(node *api.Node) error {
|
|||||||
// and thus can potentially be observed later, even though they happened before
|
// and thus can potentially be observed later, even though they happened before
|
||||||
// node removal. This is handled correctly in cache.go file.
|
// node removal. This is handled correctly in cache.go file.
|
||||||
n.node = nil
|
n.node = nil
|
||||||
|
n.generation++
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,8 +44,8 @@ func (f *FakeCache) UpdateNode(oldNode, newNode *api.Node) error { return nil }
|
|||||||
|
|
||||||
func (f *FakeCache) RemoveNode(node *api.Node) error { return nil }
|
func (f *FakeCache) RemoveNode(node *api.Node) error { return nil }
|
||||||
|
|
||||||
func (f *FakeCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) {
|
func (f *FakeCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error {
|
||||||
return nil, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FakeCache) List(s labels.Selector) ([]*api.Pod, error) { return nil, nil }
|
func (f *FakeCache) List(s labels.Selector) ([]*api.Pod, error) { return nil, nil }
|
||||||
|
@ -41,8 +41,9 @@ func (p PodsToCache) UpdateNode(oldNode, newNode *api.Node) error { return nil }
|
|||||||
|
|
||||||
func (p PodsToCache) RemoveNode(node *api.Node) error { return nil }
|
func (p PodsToCache) RemoveNode(node *api.Node) error { return nil }
|
||||||
|
|
||||||
func (p PodsToCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) {
|
func (p PodsToCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error {
|
||||||
return schedulercache.CreateNodeNameToInfoMap(p), nil
|
infoMap = schedulercache.CreateNodeNameToInfoMap(p)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p PodsToCache) List(s labels.Selector) (selected []*api.Pod, err error) {
|
func (p PodsToCache) List(s labels.Selector) (selected []*api.Pod, err error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user