move scheduler cache ListNodes interface to snapshot

This commit is contained in:
Abdullah Gharaibeh 2019-10-14 17:47:42 -04:00
parent 34db57b007
commit e073e56095
9 changed files with 26 additions and 48 deletions

View File

@ -30,13 +30,6 @@ var NodeFieldSelectorKeys = map[string]func(*v1.Node) string{
schedulerapi.NodeFieldSelectorKeyNodeName: func(n *v1.Node) string { return n.Name },
}
// NodeLister interface represents anything that can list nodes for a scheduler.
type NodeLister interface {
// We explicitly return []*v1.Node, instead of v1.NodeList, to avoid
// performing expensive copies that are unneeded.
ListNodes() []*v1.Node
}
// PodFilter is a function to filter a pod. If pod passed return true else return false.
type PodFilter func(*v1.Pod) bool

View File

@ -41,7 +41,6 @@ type PluginFactoryArgs struct {
ControllerLister algorithm.ControllerLister
ReplicaSetLister algorithm.ReplicaSetLister
StatefulSetLister algorithm.StatefulSetLister
NodeLister algorithm.NodeLister
PDBLister algorithm.PDBLister
NodeInfo predicates.NodeInfo
CSINodeInfo predicates.CSINodeInfo

View File

@ -321,11 +321,10 @@ func (g *genericScheduler) Preempt(state *framework.CycleState, pod *v1.Pod, sch
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil
}
allNodes := g.cache.ListNodes()
if len(allNodes) == 0 {
if len(g.nodeInfoSnapshot.NodeInfoMap) == 0 {
return nil, nil, nil, ErrNoNodesAvailable
}
potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError)
potentialNodes := nodesWherePreemptionMightHelp(g.nodeInfoSnapshot.NodeInfoMap, fitError)
if len(potentialNodes) == 0 {
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
// In this case, we should clean-up any existing nominated node name of the pod.
@ -460,7 +459,7 @@ func (g *genericScheduler) findNodesThatFit(state *framework.CycleState, pod *v1
filteredNodesStatuses := framework.NodeToStatusMap{}
if len(g.predicates) == 0 {
filtered = g.cache.ListNodes()
filtered = g.nodeInfoSnapshot.ListNodes()
} else {
allNodes := int32(g.cache.NodeTree().NumNodes())
numNodesToFind := g.numFeasibleNodesToFind(allNodes)
@ -1205,21 +1204,21 @@ func (g *genericScheduler) selectVictimsOnNode(
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
// that may be satisfied by removing pods from the node.
func nodesWherePreemptionMightHelp(nodes []*v1.Node, fitErr *FitError) []*v1.Node {
func nodesWherePreemptionMightHelp(nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, fitErr *FitError) []*v1.Node {
potentialNodes := []*v1.Node{}
for _, node := range nodes {
if fitErr.FilteredNodesStatuses[node.Name].Code() == framework.UnschedulableAndUnresolvable {
for name, node := range nodeNameToInfo {
if fitErr.FilteredNodesStatuses[name].Code() == framework.UnschedulableAndUnresolvable {
continue
}
failedPredicates, _ := fitErr.FailedPredicates[node.Name]
failedPredicates, _ := fitErr.FailedPredicates[name]
// If we assume that scheduler looks at all nodes and populates the failedPredicateMap
// (which is the case today), the !found case should never happen, but we'd prefer
// to rely less on such assumptions in the code when checking does not impose
// significant overhead.
// Also, we currently assume all failures returned by extender as resolvable.
if predicates.UnresolvablePredicateExists(failedPredicates) == nil {
klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
potentialNodes = append(potentialNodes, node)
klog.V(3).Infof("Node %v is a potential node for preemption.", name)
potentialNodes = append(potentialNodes, node.Node())
}
}
return potentialNodes

View File

@ -999,7 +999,7 @@ func TestZeroRequest(t *testing.T) {
list, err := PrioritizeNodes(
test.pod, nodeNameToInfo, metaData, priorityConfigs,
schedulertesting.FakeNodeLister(test.nodes), []algorithm.SchedulerExtender{}, emptyFramework, framework.NewCycleState())
test.nodes, []algorithm.SchedulerExtender{}, emptyFramework, framework.NewCycleState())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1795,7 +1795,7 @@ func TestNodesWherePreemptionMightHelp(t *testing.T) {
FailedPredicates: test.failedPredMap,
FilteredNodesStatuses: test.nodesStatuses,
}
nodes := nodesWherePreemptionMightHelp(makeNodeList(nodeNames), &fitErr)
nodes := nodesWherePreemptionMightHelp(schedulernodeinfo.CreateNodeNameToInfoMap(nil, makeNodeList(nodeNames)), &fitErr)
if len(test.expected) != len(nodes) {
t.Errorf("number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", len(test.expected), len(nodes), nodes)
}

View File

@ -592,7 +592,6 @@ func (c *Configurator) getAlgorithmArgs() (*PluginFactoryArgs, *plugins.ConfigPr
ControllerLister: c.controllerLister,
ReplicaSetLister: c.replicaSetLister,
StatefulSetLister: c.statefulSetLister,
NodeLister: c.schedulerCache,
PDBLister: c.pdbLister,
NodeInfo: c.schedulerCache,
CSINodeInfo: c.schedulerCache,

View File

@ -705,22 +705,6 @@ func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) {
return n.info.Node(), nil
}
// ListNodes returns the cached list of nodes.
func (cache *schedulerCache) ListNodes() []*v1.Node {
cache.mu.RLock()
defer cache.mu.RUnlock()
nodes := make([]*v1.Node, 0, len(cache.nodes))
for _, node := range cache.nodes {
// Node info is sometimes not removed immediately. See schedulerCache.RemoveNode.
n := node.info.Node()
if n != nil {
nodes = append(nodes, n)
}
}
return nodes
}
func (cache *schedulerCache) GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()

View File

@ -59,7 +59,6 @@ import (
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
type Cache interface {
algorithm.PodLister
algorithm.NodeLister
// AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
// The implementation also decides the policy to expire pod before being confirmed (receiving Add event).

View File

@ -16,6 +16,10 @@ limitations under the License.
package nodeinfo
import (
v1 "k8s.io/api/core/v1"
)
// Snapshot is a snapshot of cache NodeInfo. The scheduler takes a
// snapshot at the beginning of each scheduling cycle and uses it for its
// operations in that cycle.
@ -30,3 +34,14 @@ func NewSnapshot() *Snapshot {
NodeInfoMap: make(map[string]*NodeInfo),
}
}
// ListNodes returns the list of nodes in the snapshot.
func (s *Snapshot) ListNodes() []*v1.Node {
nodes := make([]*v1.Node, 0, len(s.NodeInfoMap))
for _, n := range s.NodeInfoMap {
if n != nil && n.node != nil {
nodes = append(nodes, n.node)
}
}
return nodes
}

View File

@ -28,16 +28,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm"
)
var _ algorithm.NodeLister = &FakeNodeLister{}
// FakeNodeLister implements NodeLister on a []string for test purposes.
type FakeNodeLister []*v1.Node
// ListNodes returns nodes as a []*v1.Node.
func (f FakeNodeLister) ListNodes() []*v1.Node {
return f
}
var _ algorithm.PodLister = &FakePodLister{}
// FakePodLister implements PodLister on an []v1.Pods for test purposes.