Break DS controller on scheduler predicates and predicate errors

This commit is contained in:
Abdullah Gharaibeh 2019-12-30 12:58:28 -05:00
parent ce2102f363
commit 7d604c318c
8 changed files with 85 additions and 157 deletions

View File

@ -241,6 +241,8 @@
"k8s.io/kubernetes/pkg/registry/core/secret",
"k8s.io/kubernetes/pkg/scheduler/algorithm",
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates",
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper",
"k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1",
"k8s.io/kubernetes/pkg/scheduler/nodeinfo",
"k8s.io/kubernetes/pkg/serviceaccount",
"k8s.io/kubernetes/pkg/util/goroutinemap",

View File

@ -16,9 +16,10 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/daemon",
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/daemon/util:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/plugins/helper:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/util/labels:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",

View File

@ -48,9 +48,10 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics/prometheus/ratelimiter"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/daemon/util"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/utils/integer"
)
@ -622,7 +623,7 @@ func (dsc *DaemonSetsController) addNode(obj interface{}) {
}
node := obj.(*v1.Node)
for _, ds := range dsList {
_, shouldSchedule, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
shouldSchedule, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
continue
}
@ -684,11 +685,11 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
}
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
for _, ds := range dsList {
_, oldShouldSchedule, oldShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(oldNode, ds)
oldShouldSchedule, oldShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(oldNode, ds)
if err != nil {
continue
}
_, currentShouldSchedule, currentShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(curNode, ds)
currentShouldSchedule, currentShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(curNode, ds)
if err != nil {
continue
}
@ -788,7 +789,7 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
ds *apps.DaemonSet,
) (nodesNeedingDaemonPods, podsToDelete []string, err error) {
_, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
return
}
@ -1053,7 +1054,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeL
var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int
for _, node := range nodeList {
wantToRun, _, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
wantToRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
return err
}
@ -1192,102 +1193,53 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
return dsc.updateDaemonSetStatus(ds, nodeList, hash, true)
}
func (dsc *DaemonSetsController) simulate(newPod *v1.Pod, node *v1.Node, ds *apps.DaemonSet) ([]predicates.PredicateFailureReason, *schedulernodeinfo.NodeInfo, error) {
objects, err := dsc.podNodeIndex.ByIndex("nodeName", node.Name)
if err != nil {
return nil, nil, err
}
nodeInfo := schedulernodeinfo.NewNodeInfo()
nodeInfo.SetNode(node)
for _, obj := range objects {
// Ignore pods that belong to the daemonset when taking into account whether a daemonset should bind to a node.
pod, ok := obj.(*v1.Pod)
if !ok {
continue
}
if metav1.IsControlledBy(pod, ds) {
continue
}
nodeInfo.AddPod(pod)
}
_, reasons, err := Predicates(newPod, nodeInfo)
return reasons, nodeInfo, err
}
// nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a
// summary. Returned booleans are:
// * wantToRun:
// Returns true when a user would expect a pod to run on this node and ignores conditions
// such as DiskPressure or insufficient resource that would cause a daemonset pod not to schedule.
// This is primarily used to populate daemonset status.
// * shouldSchedule:
// Returns true when a daemonset should be scheduled to a node if a daemonset pod is not already
// running on that node.
// * shouldContinueRunning:
// Returns true when a daemonset should continue running on a node if a daemonset pod is already
// running on that node.
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) {
newPod := NewPod(ds, node.Name)
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (bool, bool, error) {
pod := NewPod(ds, node.Name)
// Because these bools require an && of all their required conditions, we start
// with all bools set to true and set a bool to false if a condition is not met.
// A bool should probably not be set to true after this line.
wantToRun, shouldSchedule, shouldContinueRunning = true, true, true
// If the daemon set specifies a node name, check that it matches with node.Name.
if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
return false, false, false, nil
return false, false, nil
}
reasons, nodeInfo, err := dsc.simulate(newPod, node, ds)
nodeInfo := schedulernodeinfo.NewNodeInfo()
nodeInfo.SetNode(node)
taints, err := nodeInfo.Taints()
if err != nil {
klog.Warningf("DaemonSet Predicates failed on node %s for ds '%s/%s' due to unexpected error: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, err)
return false, false, false, err
klog.Warningf("failed to get node %q taints: %v", node.Name, err)
return false, false, err
}
// TODO(k82cn): When 'ScheduleDaemonSetPods' upgrade to beta or GA, remove unnecessary check on failure reason,
// e.g. InsufficientResourceError; and simplify "wantToRun, shouldSchedule, shouldContinueRunning"
// into one result, e.g. selectedNode.
for _, r := range reasons {
klog.V(4).Infof("DaemonSet Predicates failed on node %s for ds '%s/%s' for reason: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason())
switch reason := r.(type) {
case *predicates.PredicateFailureError:
// we try to partition predicates into two partitions here: intentional on the part of the operator and not.
switch reason {
// intentional
case
predicates.ErrNodeSelectorNotMatch,
predicates.ErrPodNotMatchHostName,
predicates.ErrNodeLabelPresenceViolated,
// this one is probably intentional since it's a workaround for not having
// pod hard anti affinity.
predicates.ErrPodNotFitsHostPorts:
return false, false, false, nil
case predicates.ErrTaintsTolerationsNotMatch:
// DaemonSet is expected to respect taints and tolerations
fitsNoExecute, _, err := predicates.PodToleratesNodeNoExecuteTaints(newPod, nil, nodeInfo)
if err != nil {
return false, false, false, err
}
if !fitsNoExecute {
return false, false, false, nil
}
wantToRun, shouldSchedule = false, false
// unexpected
case
predicates.ErrPodAffinityNotMatch,
predicates.ErrServiceAffinityViolated:
klog.Warningf("unexpected predicate failure reason: %s", reason.GetReason())
return false, false, false, fmt.Errorf("unexpected reason: DaemonSet Predicates should not return reason %s", reason.GetReason())
default:
klog.V(4).Infof("unknown predicate failure reason: %s", reason.GetReason())
wantToRun, shouldSchedule, shouldContinueRunning = false, false, false
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, reason.GetReason())
}
}
fitsNodeName, fitsNodeAffinity, fitsTaints := PodFitsNode(pod, node, taints)
if !fitsNodeName || !fitsNodeAffinity {
return false, false, nil
}
if !fitsTaints {
fitsNoExecuteTaint := v1helper.TolerationsTolerateTaintsWithFilter(pod.Spec.Tolerations, taints, func(t *v1.Taint) bool {
return t.Effect == v1.TaintEffectNoExecute
})
return false, fitsNoExecuteTaint, nil
}
return true, true, nil
}
// PodFitsNode Checks if a DaemonSet's pod can be scheduled on a node.
func PodFitsNode(pod *v1.Pod, node *v1.Node, taints []v1.Taint) (fitsNodeName, fitsNodeAffinity, fitsTaints bool) {
fitsNodeName = len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == node.Name
fitsNodeAffinity = pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, node)
fitsTaints = v1helper.TolerationsTolerateTaintsWithFilter(pod.Spec.Tolerations, taints, func(t *v1.Taint) bool {
return t.Effect == v1.TaintEffectNoExecute || t.Effect == v1.TaintEffectNoSchedule
})
return
}
@ -1303,55 +1255,6 @@ func NewPod(ds *apps.DaemonSet, nodeName string) *v1.Pod {
return newPod
}
// checkNodeFitness runs a set of predicates that select candidate nodes for the DaemonSet;
// the predicates include:
// - PodFitsHost: checks pod's NodeName against node
// - PodMatchNodeSelector: checks pod's NodeSelector and NodeAffinity against node
// - PodToleratesNodeTaints: exclude tainted node unless pod has specific toleration
func checkNodeFitness(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {
var predicateFails []predicates.PredicateFailureReason
fit, reasons, err := predicates.PodFitsHost(pod, nil, nodeInfo)
if err != nil {
return false, predicateFails, err
}
if !fit {
predicateFails = append(predicateFails, reasons...)
}
fit, reasons, err = predicates.PodMatchNodeSelector(pod, nil, nodeInfo)
if err != nil {
return false, predicateFails, err
}
if !fit {
predicateFails = append(predicateFails, reasons...)
}
fit, reasons, err = predicates.PodToleratesNodeTaints(pod, nil, nodeInfo)
if err != nil {
return false, predicateFails, err
}
if !fit {
predicateFails = append(predicateFails, reasons...)
}
return len(predicateFails) == 0, predicateFails, nil
}
// Predicates checks if a DaemonSet's pod can be scheduled on a node using GeneralPredicates
// and PodToleratesNodeTaints predicate
func Predicates(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {
var predicateFails []predicates.PredicateFailureReason
fit, reasons, err := checkNodeFitness(pod, nodeInfo)
if err != nil {
return false, predicateFails, err
}
if !fit {
predicateFails = append(predicateFails, reasons...)
}
return len(predicateFails) == 0, predicateFails, nil
}
type podByCreationTimestampAndPhase []*v1.Pod
func (o podByCreationTimestampAndPhase) Len() int { return len(o) }

View File

@ -1539,9 +1539,8 @@ func setDaemonSetCritical(ds *apps.DaemonSet) {
}
func TestNodeShouldRunDaemonPod(t *testing.T) {
var shouldCreate, wantToRun, shouldContinueRunning bool
var shouldCreate, shouldContinueRunning bool
shouldCreate = true
wantToRun = true
shouldContinueRunning = true
cases := []struct {
predicateName string
@ -1565,7 +1564,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
},
},
},
wantToRun: true,
shouldCreate: true,
shouldContinueRunning: true,
},
@ -1582,7 +1580,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
},
},
},
wantToRun: true,
shouldCreate: shouldCreate,
shouldContinueRunning: true,
},
@ -1599,7 +1596,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
},
},
},
wantToRun: false,
shouldCreate: false,
shouldContinueRunning: false,
},
@ -1633,7 +1629,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
},
},
},
wantToRun: wantToRun,
shouldCreate: shouldCreate,
shouldContinueRunning: shouldContinueRunning,
},
@ -1662,7 +1657,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
},
},
},
wantToRun: true,
shouldCreate: shouldCreate, // This is because we don't care about the resource constraints any more and let default scheduler handle it.
shouldContinueRunning: true,
},
@ -1691,7 +1685,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
},
},
},
wantToRun: true,
shouldCreate: true,
shouldContinueRunning: true,
},
@ -1710,7 +1703,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
},
},
},
wantToRun: false,
shouldCreate: false,
shouldContinueRunning: false,
},
@ -1729,7 +1721,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
},
},
},
wantToRun: true,
shouldCreate: true,
shouldContinueRunning: true,
},
@ -1764,7 +1755,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
},
},
},
wantToRun: false,
shouldCreate: false,
shouldContinueRunning: false,
},
@ -1799,7 +1789,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
},
},
},
wantToRun: true,
shouldCreate: true,
shouldContinueRunning: true,
},
@ -1817,7 +1806,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
},
},
nodeUnschedulable: true,
wantToRun: true,
shouldCreate: true,
shouldContinueRunning: true,
},
@ -1840,11 +1828,8 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
manager.podNodeIndex.Add(p)
}
c.ds.Spec.UpdateStrategy = *strategy
wantToRun, shouldRun, shouldContinueRunning, err := manager.nodeShouldRunDaemonPod(node, c.ds)
shouldRun, shouldContinueRunning, err := manager.nodeShouldRunDaemonPod(node, c.ds)
if wantToRun != c.wantToRun {
t.Errorf("[%v] strategy: %v, predicateName: %v expected wantToRun: %v, got: %v", i, c.ds.Spec.UpdateStrategy.Type, c.predicateName, c.wantToRun, wantToRun)
}
if shouldRun != c.shouldCreate {
t.Errorf("[%v] strategy: %v, predicateName: %v expected shouldRun: %v, got: %v", i, c.ds.Spec.UpdateStrategy.Type, c.predicateName, c.shouldCreate, shouldRun)
}

View File

@ -389,7 +389,7 @@ func (dsc *DaemonSetsController) getUnavailableNumbers(ds *apps.DaemonSet, nodeL
var numUnavailable, desiredNumberScheduled int
for i := range nodeList {
node := nodeList[i]
wantToRun, _, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
wantToRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
return -1, -1, err
}

View File

@ -2,10 +2,17 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["normalize_score.go"],
srcs = [
"node_affinity.go",
"normalize_score.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper",
visibility = ["//visibility:public"],
deps = ["//pkg/scheduler/framework/v1alpha1:go_default_library"],
deps = [
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
],
)
go_test(

View File

@ -0,0 +1,29 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package helper
import (
v1 "k8s.io/api/core/v1"
predicates "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
)
// PodMatchesNodeSelectorAndAffinityTerms checks whether the pod is schedulable onto nodes according to
// the requirements in both NodeAffinity and nodeSelector.
func PodMatchesNodeSelectorAndAffinityTerms(pod *v1.Pod, node *v1.Node) bool {
// TODO(ahg-g): actually move the logic here.
return predicates.PodMatchesNodeSelectorAndAffinityTerms(pod, node)
}

View File

@ -688,12 +688,13 @@ func canScheduleOnNode(node v1.Node, ds *appsv1.DaemonSet) bool {
newPod := daemon.NewPod(ds, node.Name)
nodeInfo := schedulernodeinfo.NewNodeInfo()
nodeInfo.SetNode(&node)
fit, _, err := daemon.Predicates(newPod, nodeInfo)
taints, err := nodeInfo.Taints()
if err != nil {
framework.Failf("Can't test DaemonSet predicates for node %s: %v", node.Name, err)
return false
}
return fit
fitsNodeName, fitsNodeAffinity, fitsTaints := daemon.PodFitsNode(newPod, &node, taints)
return fitsNodeName && fitsNodeAffinity && fitsTaints
}
func checkRunningOnNoNodes(f *framework.Framework, ds *appsv1.DaemonSet) func() (bool, error) {