Check for old NodeInfo when updating equiv. cache.

Because the scheduler takes a snapshot of cache data at the start of
each scheduling cycle, updates to the equivalence cache should be
skipped if there was a cache update during the cycle.

If the current NodeInfo becomes stale while we evaluate predicates, we
will not write any results into the equivalence cache. We will still use
the results for the current scheduling cycle, though.
This commit is contained in:
Jonathan Basseri 2018-04-23 14:24:00 -07:00
parent ca7bfc02ee
commit dacc1a8d52
2 changed files with 62 additions and 55 deletions

View File

@ -128,7 +128,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
trace.Step("Computing predicates") trace.Step("Computing predicates")
startPredicateEvalTime := time.Now() startPredicateEvalTime := time.Now()
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache, g.schedulingQueue, g.alwaysCheckAllPredicates) filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -325,21 +325,11 @@ func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName s
// Filters the nodes to find the ones that fit based on the given predicate functions // Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit // Each node is passed through the predicate functions to determine if it is a fit
func findNodesThatFit( func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
nodes []*v1.Node,
predicateFuncs map[string]algorithm.FitPredicate,
extenders []algorithm.SchedulerExtender,
metadataProducer algorithm.PredicateMetadataProducer,
ecache *EquivalenceCache,
schedulingQueue SchedulingQueue,
alwaysCheckAllPredicates bool,
) ([]*v1.Node, FailedPredicateMap, error) {
var filtered []*v1.Node var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{} failedPredicateMap := FailedPredicateMap{}
if len(predicateFuncs) == 0 { if len(g.predicates) == 0 {
filtered = nodes filtered = nodes
} else { } else {
// Create filtered list with enough space to avoid growing it // Create filtered list with enough space to avoid growing it
@ -350,12 +340,12 @@ func findNodesThatFit(
var filteredLen int32 var filteredLen int32
// We can use the same metadata producer for all nodes. // We can use the same metadata producer for all nodes.
meta := metadataProducer(pod, nodeNameToInfo) meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
var equivCacheInfo *equivalenceClassInfo var equivCacheInfo *equivalenceClassInfo
if ecache != nil { if g.equivalenceCache != nil {
// getEquivalenceClassInfo will return immediately if no equivalence pod found // getEquivalenceClassInfo will return immediately if no equivalence pod found
equivCacheInfo = ecache.getEquivalenceClassInfo(pod) equivCacheInfo = g.equivalenceCache.getEquivalenceClassInfo(pod)
} }
checkNode := func(i int) { checkNode := func(i int) {
@ -363,11 +353,12 @@ func findNodesThatFit(
fits, failedPredicates, err := podFitsOnNode( fits, failedPredicates, err := podFitsOnNode(
pod, pod,
meta, meta,
nodeNameToInfo[nodeName], g.cachedNodeInfoMap[nodeName],
predicateFuncs, g.predicates,
ecache, g.cache,
schedulingQueue, g.equivalenceCache,
alwaysCheckAllPredicates, g.schedulingQueue,
g.alwaysCheckAllPredicates,
equivCacheInfo, equivCacheInfo,
) )
if err != nil { if err != nil {
@ -391,12 +382,12 @@ func findNodesThatFit(
} }
} }
if len(filtered) > 0 && len(extenders) != 0 { if len(filtered) > 0 && len(g.extenders) != 0 {
for _, extender := range extenders { for _, extender := range g.extenders {
if !extender.IsInterested(pod) { if !extender.IsInterested(pod) {
continue continue
} }
filteredList, failedMap, err := extender.Filter(pod, filtered, nodeNameToInfo) filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
if err != nil { if err != nil {
if extender.IsIgnorable() { if extender.IsIgnorable() {
glog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set", glog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
@ -467,6 +458,7 @@ func podFitsOnNode(
meta algorithm.PredicateMetadata, meta algorithm.PredicateMetadata,
info *schedulercache.NodeInfo, info *schedulercache.NodeInfo,
predicateFuncs map[string]algorithm.FitPredicate, predicateFuncs map[string]algorithm.FitPredicate,
cache schedulercache.Cache,
ecache *EquivalenceCache, ecache *EquivalenceCache,
queue SchedulingQueue, queue SchedulingQueue,
alwaysCheckAllPredicates bool, alwaysCheckAllPredicates bool,
@ -548,12 +540,15 @@ func podFitsOnNode(
} else { } else {
predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons} predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
} }
// Skip update if NodeInfo is stale.
if cache != nil && cache.IsUpToDate(info) {
result := predicateResults[predicateKey] result := predicateResults[predicateKey]
ecache.UpdateCachedPredicateItem( ecache.UpdateCachedPredicateItem(
pod.GetName(), info.Node().GetName(), pod.GetName(), info.Node().GetName(),
predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false) predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false)
} }
} }
}
}() }()
if err != nil { if err != nil {
@ -976,7 +971,7 @@ func selectVictimsOnNode(
// that we should check is if the "pod" is failing to schedule due to pod affinity // that we should check is if the "pod" is failing to schedule due to pod affinity
// failure. // failure.
// TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance. // TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance.
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits { if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, nil, queue, false, nil); !fits {
if err != nil { if err != nil {
glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
} }
@ -990,7 +985,7 @@ func selectVictimsOnNode(
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs) violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
reprievePod := func(p *v1.Pod) bool { reprievePod := func(p *v1.Pod) bool {
addPod(p) addPod(p)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil) fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, nil, queue, false, nil)
if !fits { if !fits {
removePod(p) removePod(p)
victims = append(victims, p) victims = append(victims, p)

View File

@ -432,16 +432,35 @@ func TestGenericScheduler(t *testing.T) {
} }
} }
func TestFindFitAllError(t *testing.T) { // makeScheduler makes a simple genericScheduler for testing.
func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Node) *genericScheduler {
algorithmpredicates.SetPredicatesOrdering(order) algorithmpredicates.SetPredicatesOrdering(order)
nodes := []string{"3", "2", "1"} cache := schedulercache.New(time.Duration(0), wait.NeverStop)
predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "false": falsePredicate} for _, n := range nodes {
nodeNameToInfo := map[string]*schedulercache.NodeInfo{ cache.AddNode(n)
"3": schedulercache.NewNodeInfo(),
"2": schedulercache.NewNodeInfo(),
"1": schedulercache.NewNodeInfo(),
} }
_, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil, false) prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}
s := NewGenericScheduler(
cache,
nil,
NewSchedulingQueue(),
predicates,
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
nil, nil, nil, false, false)
cache.UpdateNodeNameToInfoMap(s.(*genericScheduler).cachedNodeInfoMap)
return s.(*genericScheduler)
}
func TestFindFitAllError(t *testing.T) {
predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "matches": matchesPredicate}
nodes := makeNodeList([]string{"3", "2", "1"})
scheduler := makeScheduler(predicates, nodes)
_, predicateMap, err := scheduler.findNodesThatFit(&v1.Pod{}, nodes)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -452,9 +471,9 @@ func TestFindFitAllError(t *testing.T) {
} }
for _, node := range nodes { for _, node := range nodes {
failures, found := predicateMap[node] failures, found := predicateMap[node.Name]
if !found { if !found {
t.Errorf("failed to find node: %s in %v", node, predicateMap) t.Errorf("failed to find node: %s in %v", node.Name, predicateMap)
} }
if len(failures) != 1 || failures[0] != algorithmpredicates.ErrFakePredicate { if len(failures) != 1 || failures[0] != algorithmpredicates.ErrFakePredicate {
t.Errorf("unexpected failures: %v", failures) t.Errorf("unexpected failures: %v", failures)
@ -463,20 +482,13 @@ func TestFindFitAllError(t *testing.T) {
} }
func TestFindFitSomeError(t *testing.T) { func TestFindFitSomeError(t *testing.T) {
algorithmpredicates.SetPredicatesOrdering(order)
nodes := []string{"3", "2", "1"}
predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "matches": matchesPredicate} predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "matches": matchesPredicate}
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} nodes := makeNodeList([]string{"3", "2", "1"})
nodeNameToInfo := map[string]*schedulercache.NodeInfo{ scheduler := makeScheduler(predicates, nodes)
"3": schedulercache.NewNodeInfo(),
"2": schedulercache.NewNodeInfo(), pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
"1": schedulercache.NewNodeInfo(pod), _, predicateMap, err := scheduler.findNodesThatFit(pod, nodes)
}
for name := range nodeNameToInfo {
nodeNameToInfo[name].SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
}
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil, false)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -486,12 +498,12 @@ func TestFindFitSomeError(t *testing.T) {
} }
for _, node := range nodes { for _, node := range nodes {
if node == pod.Name { if node.Name == pod.Name {
continue continue
} }
failures, found := predicateMap[node] failures, found := predicateMap[node.Name]
if !found { if !found {
t.Errorf("failed to find node: %s in %v", node, predicateMap) t.Errorf("failed to find node: %s in %v", node.Name, predicateMap)
} }
if len(failures) != 1 || failures[0] != algorithmpredicates.ErrFakePredicate { if len(failures) != 1 || failures[0] != algorithmpredicates.ErrFakePredicate {
t.Errorf("unexpected failures: %v", failures) t.Errorf("unexpected failures: %v", failures)