mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Update ecache and add scheduler method
This commit is contained in:
parent
560c46574e
commit
50eaeaa7bd
@ -91,8 +91,8 @@ func PredicateMetadata(pod *api.Pod, nodeInfoMap map[string]*schedulercache.Node
|
||||
}
|
||||
return &predicateMetadata{
|
||||
podBestEffort: isPodBestEffort(pod),
|
||||
podRequest: getResourceRequest(pod),
|
||||
podPorts: getUsedPorts(pod),
|
||||
podRequest: GetResourceRequest(pod),
|
||||
podPorts: GetUsedPorts(pod),
|
||||
matchingAntiAffinityTerms: matchingTerms,
|
||||
}
|
||||
}
|
||||
@ -417,7 +417,7 @@ func (c *VolumeZoneChecker) predicate(pod *api.Pod, meta interface{}, nodeInfo *
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
func getResourceRequest(pod *api.Pod) *schedulercache.Resource {
|
||||
func GetResourceRequest(pod *api.Pod) *schedulercache.Resource {
|
||||
result := schedulercache.Resource{}
|
||||
for _, container := range pod.Spec.Containers {
|
||||
requests := container.Resources.Requests
|
||||
@ -459,7 +459,7 @@ func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
|
||||
podRequest = predicateMeta.podRequest
|
||||
} else {
|
||||
// We couldn't parse metadata - fallback to computing it.
|
||||
podRequest = getResourceRequest(pod)
|
||||
podRequest = GetResourceRequest(pod)
|
||||
}
|
||||
if podRequest.MilliCPU == 0 && podRequest.Memory == 0 && podRequest.NvidiaGPU == 0 {
|
||||
return len(predicateFails) == 0, predicateFails, nil
|
||||
@ -702,14 +702,14 @@ func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
|
||||
wantPorts = predicateMeta.podPorts
|
||||
} else {
|
||||
// We couldn't parse metadata - fallback to computing it.
|
||||
wantPorts = getUsedPorts(pod)
|
||||
wantPorts = GetUsedPorts(pod)
|
||||
}
|
||||
if len(wantPorts) == 0 {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
// TODO: Aggregate it at the NodeInfo level.
|
||||
existingPorts := getUsedPorts(nodeInfo.Pods()...)
|
||||
existingPorts := GetUsedPorts(nodeInfo.Pods()...)
|
||||
for wport := range wantPorts {
|
||||
if wport != 0 && existingPorts[wport] {
|
||||
return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil
|
||||
@ -718,7 +718,7 @@ func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
func getUsedPorts(pods ...*api.Pod) map[int]bool {
|
||||
func GetUsedPorts(pods ...*api.Pod) map[int]bool {
|
||||
ports := make(map[int]bool)
|
||||
for _, pod := range pods {
|
||||
for j := range pod.Spec.Containers {
|
||||
|
@ -467,7 +467,7 @@ func TestGetUsedPorts(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
ports := getUsedPorts(test.pods...)
|
||||
ports := GetUsedPorts(test.pods...)
|
||||
if !reflect.DeepEqual(test.ports, ports) {
|
||||
t.Errorf("%s: expected %v, got %v", "test get used ports", test.ports, ports)
|
||||
}
|
||||
|
@ -78,6 +78,10 @@ type ConfigFactory struct {
|
||||
|
||||
scheduledPodPopulator *cache.Controller
|
||||
nodePopulator *cache.Controller
|
||||
pvPopulator *cache.Controller
|
||||
pvcPopulator *cache.Controller
|
||||
servicePopulator *cache.Controller
|
||||
controllerPopulator *cache.Controller
|
||||
|
||||
schedulerCache schedulercache.Cache
|
||||
|
||||
@ -93,6 +97,9 @@ type ConfigFactory struct {
|
||||
|
||||
// Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.
|
||||
FailureDomains string
|
||||
|
||||
// Equivalence class cache
|
||||
EquivalencePodCache *scheduler.EquivalenceCache
|
||||
}
|
||||
|
||||
// Initializes the factory.
|
||||
@ -147,15 +154,48 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA
|
||||
},
|
||||
)
|
||||
|
||||
// TODO(harryz) need to fill all the handlers here and below for equivalence cache
|
||||
c.PVLister.Store, c.pvPopulator = cache.NewInformer(
|
||||
c.createPersistentVolumeLW(),
|
||||
&api.PersistentVolume{},
|
||||
0,
|
||||
cache.ResourceEventHandlerFuncs{},
|
||||
)
|
||||
|
||||
c.PVCLister.Store, c.pvcPopulator = cache.NewInformer(
|
||||
c.createPersistentVolumeClaimLW(),
|
||||
&api.PersistentVolumeClaim{},
|
||||
0,
|
||||
cache.ResourceEventHandlerFuncs{},
|
||||
)
|
||||
|
||||
c.ServiceLister.Indexer, c.servicePopulator = cache.NewIndexerInformer(
|
||||
c.createServiceLW(),
|
||||
&api.Service{},
|
||||
0,
|
||||
cache.ResourceEventHandlerFuncs{},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
|
||||
c.ControllerLister.Indexer, c.controllerPopulator = cache.NewIndexerInformer(
|
||||
c.createControllerLW(),
|
||||
&api.ReplicationController{},
|
||||
0,
|
||||
cache.ResourceEventHandlerFuncs{},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// TODO(harryz) need to update all the handlers here and below for equivalence cache
|
||||
func (c *ConfigFactory) addPodToCache(obj interface{}) {
|
||||
pod, ok := obj.(*api.Pod)
|
||||
if !ok {
|
||||
glog.Errorf("cannot convert to *api.Pod: %v", obj)
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.schedulerCache.AddPod(pod); err != nil {
|
||||
glog.Errorf("scheduler cache AddPod failed: %v", err)
|
||||
}
|
||||
@ -172,6 +212,7 @@ func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) {
|
||||
glog.Errorf("cannot convert newObj to *api.Pod: %v", newObj)
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.schedulerCache.UpdatePod(oldPod, newPod); err != nil {
|
||||
glog.Errorf("scheduler cache UpdatePod failed: %v", err)
|
||||
}
|
||||
@ -204,6 +245,7 @@ func (c *ConfigFactory) addNodeToCache(obj interface{}) {
|
||||
glog.Errorf("cannot convert to *api.Node: %v", obj)
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.schedulerCache.AddNode(node); err != nil {
|
||||
glog.Errorf("scheduler cache AddNode failed: %v", err)
|
||||
}
|
||||
@ -220,6 +262,7 @@ func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) {
|
||||
glog.Errorf("cannot convert newObj to *api.Node: %v", newObj)
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.schedulerCache.UpdateNode(oldNode, newNode); err != nil {
|
||||
glog.Errorf("scheduler cache UpdateNode failed: %v", err)
|
||||
}
|
||||
@ -407,20 +450,15 @@ func (f *ConfigFactory) Run() {
|
||||
// Begin populating nodes.
|
||||
go f.nodePopulator.Run(f.StopEverything)
|
||||
|
||||
// Watch PVs & PVCs
|
||||
// They may be listed frequently for scheduling constraints, so provide a local up-to-date cache.
|
||||
cache.NewReflector(f.createPersistentVolumeLW(), &api.PersistentVolume{}, f.PVLister.Store, 0).RunUntil(f.StopEverything)
|
||||
cache.NewReflector(f.createPersistentVolumeClaimLW(), &api.PersistentVolumeClaim{}, f.PVCLister.Store, 0).RunUntil(f.StopEverything)
|
||||
// Begin populating pv & pvc
|
||||
go f.pvPopulator.Run(f.StopEverything)
|
||||
go f.pvcPopulator.Run(f.StopEverything)
|
||||
|
||||
// Watch and cache all service objects. Scheduler needs to find all pods
|
||||
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
|
||||
// Cache this locally.
|
||||
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Indexer, 0).RunUntil(f.StopEverything)
|
||||
// Begin populating services
|
||||
go f.servicePopulator.Run(f.StopEverything)
|
||||
|
||||
// Watch and cache all ReplicationController objects. Scheduler needs to find all pods
|
||||
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
|
||||
// Cache this locally.
|
||||
cache.NewReflector(f.createControllerLW(), &api.ReplicationController{}, f.ControllerLister.Indexer, 0).RunUntil(f.StopEverything)
|
||||
// Begin populating controllers
|
||||
go f.controllerPopulator.Run(f.StopEverything)
|
||||
|
||||
// Watch and cache all ReplicaSet objects. Scheduler needs to find all pods
|
||||
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
|
||||
|
@ -71,6 +71,8 @@ type genericScheduler struct {
|
||||
lastNodeIndex uint64
|
||||
|
||||
cachedNodeInfoMap map[string]*schedulercache.NodeInfo
|
||||
|
||||
equivalenceCache *EquivalenceCache
|
||||
}
|
||||
|
||||
// Schedule tries to schedule the given pod to one of node in the node list.
|
||||
@ -99,6 +101,8 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
|
||||
return "", err
|
||||
}
|
||||
|
||||
// TODO(harryz) Check if equivalenceCache is enabled and call scheduleWithEquivalenceClass here
|
||||
|
||||
trace.Step("Computing predicates")
|
||||
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders)
|
||||
if err != nil {
|
||||
|
@ -300,6 +300,7 @@ func TestGenericScheduler(t *testing.T) {
|
||||
for _, name := range test.nodes {
|
||||
cache.AddNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}})
|
||||
}
|
||||
|
||||
scheduler := NewGenericScheduler(
|
||||
cache, test.predicates, algorithm.EmptyMetadataProducer,
|
||||
test.prioritizers, []algorithm.SchedulerExtender{})
|
||||
|
Loading…
Reference in New Issue
Block a user