From 7d604c318cf956612c9c96143256ced1d841f121 Mon Sep 17 00:00:00 2001 From: Abdullah Gharaibeh Date: Mon, 30 Dec 2019 12:58:28 -0500 Subject: [PATCH] Break DS controller on scheduler predicates and predicate errors --- pkg/controller/.import-restrictions | 2 + pkg/controller/daemon/BUILD | 3 +- pkg/controller/daemon/daemon_controller.go | 171 ++++-------------- .../daemon/daemon_controller_test.go | 19 +- pkg/controller/daemon/update.go | 2 +- pkg/scheduler/framework/plugins/helper/BUILD | 11 +- .../framework/plugins/helper/node_affinity.go | 29 +++ test/e2e/apps/daemon_set.go | 5 +- 8 files changed, 85 insertions(+), 157 deletions(-) create mode 100644 pkg/scheduler/framework/plugins/helper/node_affinity.go diff --git a/pkg/controller/.import-restrictions b/pkg/controller/.import-restrictions index 3d07bfe4afc..9bf72f98703 100644 --- a/pkg/controller/.import-restrictions +++ b/pkg/controller/.import-restrictions @@ -241,6 +241,8 @@ "k8s.io/kubernetes/pkg/registry/core/secret", "k8s.io/kubernetes/pkg/scheduler/algorithm", "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates", + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper", + "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1", "k8s.io/kubernetes/pkg/scheduler/nodeinfo", "k8s.io/kubernetes/pkg/serviceaccount", "k8s.io/kubernetes/pkg/util/goroutinemap", diff --git a/pkg/controller/daemon/BUILD b/pkg/controller/daemon/BUILD index 0516c6bde5a..fb3cce43aba 100644 --- a/pkg/controller/daemon/BUILD +++ b/pkg/controller/daemon/BUILD @@ -16,9 +16,10 @@ go_library( importpath = "k8s.io/kubernetes/pkg/controller/daemon", deps = [ "//pkg/api/v1/pod:go_default_library", + "//pkg/apis/core/v1/helper:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/daemon/util:go_default_library", - "//pkg/scheduler/algorithm/predicates:go_default_library", + "//pkg/scheduler/framework/plugins/helper:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", "//pkg/util/labels:go_default_library", "//staging/src/k8s.io/api/apps/v1:go_default_library", diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index 2f3152998bc..a3e8c4a751e 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -48,9 +48,10 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/component-base/metrics/prometheus/ratelimiter" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/daemon/util" - "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" "k8s.io/utils/integer" ) @@ -622,7 +623,7 @@ func (dsc *DaemonSetsController) addNode(obj interface{}) { } node := obj.(*v1.Node) for _, ds := range dsList { - _, shouldSchedule, _, err := dsc.nodeShouldRunDaemonPod(node, ds) + shouldSchedule, _, err := dsc.nodeShouldRunDaemonPod(node, ds) if err != nil { continue } @@ -684,11 +685,11 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) { } // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too). for _, ds := range dsList { - _, oldShouldSchedule, oldShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(oldNode, ds) + oldShouldSchedule, oldShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(oldNode, ds) if err != nil { continue } - _, currentShouldSchedule, currentShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(curNode, ds) + currentShouldSchedule, currentShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(curNode, ds) if err != nil { continue } @@ -788,7 +789,7 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode( ds *apps.DaemonSet, ) (nodesNeedingDaemonPods, podsToDelete []string, err error) { - _, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds) + shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds) if err != nil { return } @@ -1053,7 +1054,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeL var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int for _, node := range nodeList { - wantToRun, _, _, err := dsc.nodeShouldRunDaemonPod(node, ds) + wantToRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds) if err != nil { return err } @@ -1192,102 +1193,53 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { return dsc.updateDaemonSetStatus(ds, nodeList, hash, true) } -func (dsc *DaemonSetsController) simulate(newPod *v1.Pod, node *v1.Node, ds *apps.DaemonSet) ([]predicates.PredicateFailureReason, *schedulernodeinfo.NodeInfo, error) { - objects, err := dsc.podNodeIndex.ByIndex("nodeName", node.Name) - if err != nil { - return nil, nil, err - } - - nodeInfo := schedulernodeinfo.NewNodeInfo() - nodeInfo.SetNode(node) - - for _, obj := range objects { - // Ignore pods that belong to the daemonset when taking into account whether a daemonset should bind to a node. - pod, ok := obj.(*v1.Pod) - if !ok { - continue - } - if metav1.IsControlledBy(pod, ds) { - continue - } - nodeInfo.AddPod(pod) - } - - _, reasons, err := Predicates(newPod, nodeInfo) - return reasons, nodeInfo, err -} - // nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a // summary. Returned booleans are: -// * wantToRun: -// Returns true when a user would expect a pod to run on this node and ignores conditions -// such as DiskPressure or insufficient resource that would cause a daemonset pod not to schedule. -// This is primarily used to populate daemonset status. // * shouldSchedule: // Returns true when a daemonset should be scheduled to a node if a daemonset pod is not already // running on that node. // * shouldContinueRunning: // Returns true when a daemonset should continue running on a node if a daemonset pod is already // running on that node. -func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) { - newPod := NewPod(ds, node.Name) +func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (bool, bool, error) { + pod := NewPod(ds, node.Name) - // Because these bools require an && of all their required conditions, we start - // with all bools set to true and set a bool to false if a condition is not met. - // A bool should probably not be set to true after this line. - wantToRun, shouldSchedule, shouldContinueRunning = true, true, true // If the daemon set specifies a node name, check that it matches with node.Name. if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) { - return false, false, false, nil + return false, false, nil } - reasons, nodeInfo, err := dsc.simulate(newPod, node, ds) + nodeInfo := schedulernodeinfo.NewNodeInfo() + nodeInfo.SetNode(node) + taints, err := nodeInfo.Taints() if err != nil { - klog.Warningf("DaemonSet Predicates failed on node %s for ds '%s/%s' due to unexpected error: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, err) - return false, false, false, err + klog.Warningf("failed to get node %q taints: %v", node.Name, err) + return false, false, err } - // TODO(k82cn): When 'ScheduleDaemonSetPods' upgrade to beta or GA, remove unnecessary check on failure reason, - // e.g. InsufficientResourceError; and simplify "wantToRun, shouldSchedule, shouldContinueRunning" - // into one result, e.g. selectedNode. - for _, r := range reasons { - klog.V(4).Infof("DaemonSet Predicates failed on node %s for ds '%s/%s' for reason: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason()) - switch reason := r.(type) { - case *predicates.PredicateFailureError: - // we try to partition predicates into two partitions here: intentional on the part of the operator and not. - switch reason { - // intentional - case - predicates.ErrNodeSelectorNotMatch, - predicates.ErrPodNotMatchHostName, - predicates.ErrNodeLabelPresenceViolated, - // this one is probably intentional since it's a workaround for not having - // pod hard anti affinity. - predicates.ErrPodNotFitsHostPorts: - return false, false, false, nil - case predicates.ErrTaintsTolerationsNotMatch: - // DaemonSet is expected to respect taints and tolerations - fitsNoExecute, _, err := predicates.PodToleratesNodeNoExecuteTaints(newPod, nil, nodeInfo) - if err != nil { - return false, false, false, err - } - if !fitsNoExecute { - return false, false, false, nil - } - wantToRun, shouldSchedule = false, false - // unexpected - case - predicates.ErrPodAffinityNotMatch, - predicates.ErrServiceAffinityViolated: - klog.Warningf("unexpected predicate failure reason: %s", reason.GetReason()) - return false, false, false, fmt.Errorf("unexpected reason: DaemonSet Predicates should not return reason %s", reason.GetReason()) - default: - klog.V(4).Infof("unknown predicate failure reason: %s", reason.GetReason()) - wantToRun, shouldSchedule, shouldContinueRunning = false, false, false - dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, reason.GetReason()) - } - } + fitsNodeName, fitsNodeAffinity, fitsTaints := PodFitsNode(pod, node, taints) + if !fitsNodeName || !fitsNodeAffinity { + return false, false, nil } + + if !fitsTaints { + fitsNoExecuteTaint := v1helper.TolerationsTolerateTaintsWithFilter(pod.Spec.Tolerations, taints, func(t *v1.Taint) bool { + return t.Effect == v1.TaintEffectNoExecute + }) + + return false, fitsNoExecuteTaint, nil + } + + return true, true, nil +} + +// PodFitsNode Checks if a DaemonSet's pod can be scheduled on a node. +func PodFitsNode(pod *v1.Pod, node *v1.Node, taints []v1.Taint) (fitsNodeName, fitsNodeAffinity, fitsTaints bool) { + fitsNodeName = len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == node.Name + fitsNodeAffinity = pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, node) + fitsTaints = v1helper.TolerationsTolerateTaintsWithFilter(pod.Spec.Tolerations, taints, func(t *v1.Taint) bool { + return t.Effect == v1.TaintEffectNoExecute || t.Effect == v1.TaintEffectNoSchedule + }) return } @@ -1303,55 +1255,6 @@ func NewPod(ds *apps.DaemonSet, nodeName string) *v1.Pod { return newPod } -// checkNodeFitness runs a set of predicates that select candidate nodes for the DaemonSet; -// the predicates include: -// - PodFitsHost: checks pod's NodeName against node -// - PodMatchNodeSelector: checks pod's NodeSelector and NodeAffinity against node -// - PodToleratesNodeTaints: exclude tainted node unless pod has specific toleration -func checkNodeFitness(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) { - var predicateFails []predicates.PredicateFailureReason - fit, reasons, err := predicates.PodFitsHost(pod, nil, nodeInfo) - if err != nil { - return false, predicateFails, err - } - if !fit { - predicateFails = append(predicateFails, reasons...) - } - - fit, reasons, err = predicates.PodMatchNodeSelector(pod, nil, nodeInfo) - if err != nil { - return false, predicateFails, err - } - if !fit { - predicateFails = append(predicateFails, reasons...) - } - - fit, reasons, err = predicates.PodToleratesNodeTaints(pod, nil, nodeInfo) - if err != nil { - return false, predicateFails, err - } - if !fit { - predicateFails = append(predicateFails, reasons...) - } - return len(predicateFails) == 0, predicateFails, nil -} - -// Predicates checks if a DaemonSet's pod can be scheduled on a node using GeneralPredicates -// and PodToleratesNodeTaints predicate -func Predicates(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) { - var predicateFails []predicates.PredicateFailureReason - - fit, reasons, err := checkNodeFitness(pod, nodeInfo) - if err != nil { - return false, predicateFails, err - } - if !fit { - predicateFails = append(predicateFails, reasons...) - } - - return len(predicateFails) == 0, predicateFails, nil -} - type podByCreationTimestampAndPhase []*v1.Pod func (o podByCreationTimestampAndPhase) Len() int { return len(o) } diff --git a/pkg/controller/daemon/daemon_controller_test.go b/pkg/controller/daemon/daemon_controller_test.go index 6e5f8c5f446..6d9fc2ccda1 100644 --- a/pkg/controller/daemon/daemon_controller_test.go +++ b/pkg/controller/daemon/daemon_controller_test.go @@ -1539,9 +1539,8 @@ func setDaemonSetCritical(ds *apps.DaemonSet) { } func TestNodeShouldRunDaemonPod(t *testing.T) { - var shouldCreate, wantToRun, shouldContinueRunning bool + var shouldCreate, shouldContinueRunning bool shouldCreate = true - wantToRun = true shouldContinueRunning = true cases := []struct { predicateName string @@ -1565,7 +1564,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) { }, }, }, - wantToRun: true, shouldCreate: true, shouldContinueRunning: true, }, @@ -1582,7 +1580,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) { }, }, }, - wantToRun: true, shouldCreate: shouldCreate, shouldContinueRunning: true, }, @@ -1599,7 +1596,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) { }, }, }, - wantToRun: false, shouldCreate: false, shouldContinueRunning: false, }, @@ -1633,7 +1629,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) { }, }, }, - wantToRun: wantToRun, shouldCreate: shouldCreate, shouldContinueRunning: shouldContinueRunning, }, @@ -1662,7 +1657,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) { }, }, }, - wantToRun: true, shouldCreate: shouldCreate, // This is because we don't care about the resource constraints any more and let default scheduler handle it. shouldContinueRunning: true, }, @@ -1691,7 +1685,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) { }, }, }, - wantToRun: true, shouldCreate: true, shouldContinueRunning: true, }, @@ -1710,7 +1703,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) { }, }, }, - wantToRun: false, shouldCreate: false, shouldContinueRunning: false, }, @@ -1729,7 +1721,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) { }, }, }, - wantToRun: true, shouldCreate: true, shouldContinueRunning: true, }, @@ -1764,7 +1755,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) { }, }, }, - wantToRun: false, shouldCreate: false, shouldContinueRunning: false, }, @@ -1799,7 +1789,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) { }, }, }, - wantToRun: true, shouldCreate: true, shouldContinueRunning: true, }, @@ -1817,7 +1806,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) { }, }, nodeUnschedulable: true, - wantToRun: true, shouldCreate: true, shouldContinueRunning: true, }, @@ -1840,11 +1828,8 @@ func TestNodeShouldRunDaemonPod(t *testing.T) { manager.podNodeIndex.Add(p) } c.ds.Spec.UpdateStrategy = *strategy - wantToRun, shouldRun, shouldContinueRunning, err := manager.nodeShouldRunDaemonPod(node, c.ds) + shouldRun, shouldContinueRunning, err := manager.nodeShouldRunDaemonPod(node, c.ds) - if wantToRun != c.wantToRun { - t.Errorf("[%v] strategy: %v, predicateName: %v expected wantToRun: %v, got: %v", i, c.ds.Spec.UpdateStrategy.Type, c.predicateName, c.wantToRun, wantToRun) - } if shouldRun != c.shouldCreate { t.Errorf("[%v] strategy: %v, predicateName: %v expected shouldRun: %v, got: %v", i, c.ds.Spec.UpdateStrategy.Type, c.predicateName, c.shouldCreate, shouldRun) } diff --git a/pkg/controller/daemon/update.go b/pkg/controller/daemon/update.go index 3172f23ff60..1c09a2e4a79 100644 --- a/pkg/controller/daemon/update.go +++ b/pkg/controller/daemon/update.go @@ -389,7 +389,7 @@ func (dsc *DaemonSetsController) getUnavailableNumbers(ds *apps.DaemonSet, nodeL var numUnavailable, desiredNumberScheduled int for i := range nodeList { node := nodeList[i] - wantToRun, _, _, err := dsc.nodeShouldRunDaemonPod(node, ds) + wantToRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds) if err != nil { return -1, -1, err } diff --git a/pkg/scheduler/framework/plugins/helper/BUILD b/pkg/scheduler/framework/plugins/helper/BUILD index 2329f3cdc90..95a3f08dcf8 100644 --- a/pkg/scheduler/framework/plugins/helper/BUILD +++ b/pkg/scheduler/framework/plugins/helper/BUILD @@ -2,10 +2,17 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = ["normalize_score.go"], + srcs = [ + "node_affinity.go", + "normalize_score.go", + ], importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper", visibility = ["//visibility:public"], - deps = ["//pkg/scheduler/framework/v1alpha1:go_default_library"], + deps = [ + "//pkg/scheduler/algorithm/predicates:go_default_library", + "//pkg/scheduler/framework/v1alpha1:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + ], ) go_test( diff --git a/pkg/scheduler/framework/plugins/helper/node_affinity.go b/pkg/scheduler/framework/plugins/helper/node_affinity.go new file mode 100644 index 00000000000..4133a2ac792 --- /dev/null +++ b/pkg/scheduler/framework/plugins/helper/node_affinity.go @@ -0,0 +1,29 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package helper + +import ( + v1 "k8s.io/api/core/v1" + predicates "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" +) + +// PodMatchesNodeSelectorAndAffinityTerms checks whether the pod is schedulable onto nodes according to +// the requirements in both NodeAffinity and nodeSelector. +func PodMatchesNodeSelectorAndAffinityTerms(pod *v1.Pod, node *v1.Node) bool { + // TODO(ahg-g): actually move the logic here. + return predicates.PodMatchesNodeSelectorAndAffinityTerms(pod, node) +} diff --git a/test/e2e/apps/daemon_set.go b/test/e2e/apps/daemon_set.go index 3d9771cb0fe..330391f197a 100644 --- a/test/e2e/apps/daemon_set.go +++ b/test/e2e/apps/daemon_set.go @@ -688,12 +688,13 @@ func canScheduleOnNode(node v1.Node, ds *appsv1.DaemonSet) bool { newPod := daemon.NewPod(ds, node.Name) nodeInfo := schedulernodeinfo.NewNodeInfo() nodeInfo.SetNode(&node) - fit, _, err := daemon.Predicates(newPod, nodeInfo) + taints, err := nodeInfo.Taints() if err != nil { framework.Failf("Can't test DaemonSet predicates for node %s: %v", node.Name, err) return false } - return fit + fitsNodeName, fitsNodeAffinity, fitsTaints := daemon.PodFitsNode(newPod, &node, taints) + return fitsNodeName && fitsNodeAffinity && fitsTaints } func checkRunningOnNoNodes(f *framework.Framework, ds *appsv1.DaemonSet) func() (bool, error) {