mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 07:20:13 +00:00
move filter logic to list&watch client
This commit is contained in:
parent
2efc738d5b
commit
e58eae313e
@ -427,20 +427,6 @@ func getUsedPorts(pods ...*api.Pod) map[int]bool {
|
||||
return ports
|
||||
}
|
||||
|
||||
func filterNonRunningPods(pods []*api.Pod) []*api.Pod {
|
||||
if len(pods) == 0 {
|
||||
return pods
|
||||
}
|
||||
result := []*api.Pod{}
|
||||
for _, pod := range pods {
|
||||
if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed {
|
||||
continue
|
||||
}
|
||||
result = append(result, pod)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// MapPodsToMachines obtains a list of pods and pivots that list into a map where the keys are host names
|
||||
// and the values are the list of pods running on that host.
|
||||
func MapPodsToMachines(lister algorithm.PodLister) (map[string][]*api.Pod, error) {
|
||||
@ -450,7 +436,6 @@ func MapPodsToMachines(lister algorithm.PodLister) (map[string][]*api.Pod, error
|
||||
if err != nil {
|
||||
return map[string][]*api.Pod{}, err
|
||||
}
|
||||
pods = filterNonRunningPods(pods)
|
||||
for _, scheduledPod := range pods {
|
||||
host := scheduledPod.Spec.NodeName
|
||||
machineToPods[host] = append(machineToPods[host], scheduledPod)
|
||||
|
@ -90,7 +90,7 @@ func NewConfigFactory(client *client.Client, rateLimiter util.RateLimiter) *Conf
|
||||
// ScheduledPodLister is something we provide to plug in functions that
|
||||
// they may need to call.
|
||||
c.ScheduledPodLister.Store, c.scheduledPodPopulator = framework.NewInformer(
|
||||
c.createAssignedPodLW(),
|
||||
c.createAssignedNonTerminatedPodLW(),
|
||||
&api.Pod{},
|
||||
0,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
@ -190,7 +190,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
||||
}
|
||||
|
||||
// Watch and queue pods that need scheduling.
|
||||
cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).RunUntil(f.StopEverything)
|
||||
cache.NewReflector(f.createUnassignedNonTerminatedPodLW(), &api.Pod{}, f.PodQueue, 0).RunUntil(f.StopEverything)
|
||||
|
||||
// Begin populating scheduled pods.
|
||||
go f.scheduledPodPopulator.Run(f.StopEverything)
|
||||
@ -257,16 +257,17 @@ func getNodeConditionPredicate() cache.NodeConditionPredicate {
|
||||
|
||||
// Returns a cache.ListWatch that finds all pods that need to be
|
||||
// scheduled.
|
||||
func (factory *ConfigFactory) createUnassignedPodLW() *cache.ListWatch {
|
||||
return cache.NewListWatchFromClient(factory.Client, "pods", api.NamespaceAll, fields.Set{client.PodHost: ""}.AsSelector())
|
||||
func (factory *ConfigFactory) createUnassignedNonTerminatedPodLW() *cache.ListWatch {
|
||||
selector := fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" + string(api.PodSucceeded) + ",status.phase!=" + string(api.PodFailed))
|
||||
return cache.NewListWatchFromClient(factory.Client, "pods", api.NamespaceAll, selector)
|
||||
}
|
||||
|
||||
// Returns a cache.ListWatch that finds all pods that are
|
||||
// already scheduled.
|
||||
// TODO: return a ListerWatcher interface instead?
|
||||
func (factory *ConfigFactory) createAssignedPodLW() *cache.ListWatch {
|
||||
return cache.NewListWatchFromClient(factory.Client, "pods", api.NamespaceAll,
|
||||
fields.ParseSelectorOrDie(client.PodHost+"!="))
|
||||
func (factory *ConfigFactory) createAssignedNonTerminatedPodLW() *cache.ListWatch {
|
||||
selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" + string(api.PodSucceeded) + ",status.phase!=" + string(api.PodFailed))
|
||||
return cache.NewListWatchFromClient(factory.Client, "pods", api.NamespaceAll, selector)
|
||||
}
|
||||
|
||||
// createNodeLW returns a cache.ListWatch that gets all changes to nodes.
|
||||
|
@ -252,38 +252,6 @@ func TestGenericScheduler(t *testing.T) {
|
||||
expectsErr: true,
|
||||
name: "test 8",
|
||||
},
|
||||
{
|
||||
predicates: map[string]algorithm.FitPredicate{
|
||||
"nopods": hasNoPodsPredicate,
|
||||
"matches": matchesPredicate,
|
||||
},
|
||||
pods: []*api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "2"},
|
||||
Spec: api.PodSpec{
|
||||
NodeName: "2",
|
||||
},
|
||||
Status: api.PodStatus{
|
||||
Phase: api.PodFailed,
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "3"},
|
||||
Spec: api.PodSpec{
|
||||
NodeName: "2",
|
||||
},
|
||||
Status: api.PodStatus{
|
||||
Phase: api.PodSucceeded,
|
||||
},
|
||||
},
|
||||
},
|
||||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}},
|
||||
|
||||
prioritizers: []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}},
|
||||
nodes: []string{"1", "2"},
|
||||
expectedHost: "2",
|
||||
name: "test 9",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
Loading…
Reference in New Issue
Block a user