From 4c882477e93591ecbe0638c2ff55b7b7b7535186 Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Thu, 23 Feb 2017 17:14:46 -0800 Subject: [PATCH] Make DaemonSet respect critical pods annotation when scheduling --- pkg/controller/daemon/BUILD | 5 ++ pkg/controller/daemon/daemoncontroller.go | 28 +++++-- .../daemon/daemoncontroller_test.go | 79 +++++++++++++++++++ .../algorithm/predicates/predicates.go | 33 +++++++- 4 files changed, 137 insertions(+), 8 deletions(-) diff --git a/pkg/controller/daemon/BUILD b/pkg/controller/daemon/BUILD index 29875a7c552..1301d9f1043 100644 --- a/pkg/controller/daemon/BUILD +++ b/pkg/controller/daemon/BUILD @@ -28,6 +28,8 @@ go_library( "//pkg/client/listers/extensions/v1beta1:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/daemon/util:go_default_library", + "//pkg/features:go_default_library", + "//pkg/kubelet/types:go_default_library", "//pkg/util/metrics:go_default_library", "//plugin/pkg/scheduler/algorithm:go_default_library", "//plugin/pkg/scheduler/algorithm/predicates:go_default_library", @@ -40,6 +42,7 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/util/intstr", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", + "//vendor:k8s.io/apiserver/pkg/util/feature", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1", "//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/tools/cache", @@ -64,12 +67,14 @@ go_test( "//pkg/client/clientset_generated/clientset/fake:go_default_library", "//pkg/client/informers/informers_generated/externalversions:go_default_library", "//pkg/controller:go_default_library", + "//pkg/kubelet/types:go_default_library", "//pkg/securitycontext:go_default_library", "//vendor:k8s.io/apimachinery/pkg/api/resource", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/util/intstr", "//vendor:k8s.io/apiserver/pkg/storage/names", + "//vendor:k8s.io/apiserver/pkg/util/feature", "//vendor:k8s.io/client-go/testing", "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/record", diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index aaf3f71d29c..04f822c1f26 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -29,6 +29,7 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" v1core "k8s.io/client-go/kubernetes/typed/core/v1" clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" @@ -45,6 +46,8 @@ import ( extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/daemon/util" + "k8s.io/kubernetes/pkg/features" + kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" @@ -782,6 +785,11 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { // Returns true when a daemonset should continue running on a node if a daemonset pod is already // running on that node. func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *extensions.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) { + newPod := &v1.Pod{Spec: ds.Spec.Template.Spec, ObjectMeta: ds.Spec.Template.ObjectMeta} + newPod.Namespace = ds.Namespace + newPod.Spec.NodeName = node.Name + critical := utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalCriticalPodAnnotation) && kubelettypes.IsCriticalPod(newPod) + // Because these bools require an && of all their required conditions, we start // with all bools set to true and set a bool to false if a condition is not met. // A bool should probably not be set to true after this line. @@ -793,6 +801,11 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *exten // TODO: Move it to the predicates for _, c := range node.Status.Conditions { + if critical { + break + } + // TODO: There are other node status that the DaemonSet should ideally respect too, + // e.g. MemoryPressure, and DiskPressure if c.Type == v1.NodeOutOfDisk && c.Status == v1.ConditionTrue { // the kubelet will evict this pod if it needs to. Let kubelet // decide whether to continue running this pod so leave shouldContinueRunning @@ -801,10 +814,6 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *exten } } - newPod := &v1.Pod{Spec: ds.Spec.Template.Spec, ObjectMeta: ds.Spec.Template.ObjectMeta} - newPod.Namespace = ds.Namespace - newPod.Spec.NodeName = node.Name - // DaemonSet pods shouldn't be deleted by NodeController in case of node problems. // Add infinite toleration for taint notReady:NoExecute here // to survive taint-based eviction enforced by NodeController @@ -917,8 +926,9 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *exten // and PodToleratesNodeTaints predicate func daemonSetPredicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { var predicateFails []algorithm.PredicateFailureReason + critical := utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalCriticalPodAnnotation) && kubelettypes.IsCriticalPod(pod) - fit, reasons, err := predicates.GeneralPredicates(pod, nil, nodeInfo) + fit, reasons, err := predicates.PodToleratesNodeTaints(pod, nil, nodeInfo) if err != nil { return false, predicateFails, err } @@ -926,7 +936,13 @@ func daemonSetPredicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, predicateFails = append(predicateFails, reasons...) } - fit, reasons, err = predicates.PodToleratesNodeTaints(pod, nil, nodeInfo) + if critical { + // If the pod is marked as critical and support for critical pod annotations is enabled, + // check predicates for critical pods only. + fit, reasons, err = predicates.EssentialPredicates(pod, nil, nodeInfo) + } else { + fit, reasons, err = predicates.GeneralPredicates(pod, nil, nodeInfo) + } if err != nil { return false, predicateFails, err } diff --git a/pkg/controller/daemon/daemoncontroller_test.go b/pkg/controller/daemon/daemoncontroller_test.go index faac5fdc79f..4262357f177 100644 --- a/pkg/controller/daemon/daemoncontroller_test.go +++ b/pkg/controller/daemon/daemoncontroller_test.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/storage/names" + utilfeature "k8s.io/apiserver/pkg/util/feature" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -35,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/controller" + kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/securitycontext" ) @@ -892,6 +894,83 @@ func setDaemonSetToleration(ds *extensions.DaemonSet, tolerations []v1.Toleratio ds.Spec.Template.Spec.Tolerations = tolerations } +// DaemonSet should launch a critical pod even when the node is OutOfDisk. +func TestOutOfDiskNodeDaemonLaunchesCriticalPod(t *testing.T) { + manager, podControl, _ := newTestController() + + node := newNode("not-enough-disk", nil) + node.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}} + manager.nodeStore.Add(node) + + // Without enabling critical pod annotation feature gate, we shouldn't create critical pod + utilfeature.DefaultFeatureGate.Set("ExperimentalCriticalPodAnnotation=False") + ds := newDaemonSet("critical") + setDaemonSetCritical(ds) + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) + + // Enabling critical pod annotation feature gate should create critical pod + utilfeature.DefaultFeatureGate.Set("ExperimentalCriticalPodAnnotation=True") + syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) +} + +// DaemonSet should launch a critical pod even when the node has insufficient free resource. +func TestInsufficientCapacityNodeDaemonLaunchesCriticalPod(t *testing.T) { + podSpec := resourcePodSpec("too-much-mem", "75M", "75m") + manager, podControl, _ := newTestController() + node := newNode("too-much-mem", nil) + node.Status.Allocatable = allocatableResources("100M", "200m") + manager.nodeStore.Add(node) + manager.podStore.Add(&v1.Pod{ + Spec: podSpec, + }) + + // Without enabling critical pod annotation feature gate, we shouldn't create critical pod + utilfeature.DefaultFeatureGate.Set("ExperimentalCriticalPodAnnotation=False") + ds := newDaemonSet("critical") + ds.Spec.Template.Spec = podSpec + setDaemonSetCritical(ds) + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) + + // Enabling critical pod annotation feature gate should create critical pod + utilfeature.DefaultFeatureGate.Set("ExperimentalCriticalPodAnnotation=True") + syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) +} + +// DaemonSets should NOT launch a critical pod when there are port conflicts. +func TestPortConflictNodeDaemonDoesNotLaunchCriticalPod(t *testing.T) { + podSpec := v1.PodSpec{ + NodeName: "port-conflict", + Containers: []v1.Container{{ + Ports: []v1.ContainerPort{{ + HostPort: 666, + }}, + }}, + } + manager, podControl, _ := newTestController() + node := newNode("port-conflict", nil) + manager.nodeStore.Add(node) + manager.podStore.Add(&v1.Pod{ + Spec: podSpec, + }) + + utilfeature.DefaultFeatureGate.Set("ExperimentalCriticalPodAnnotation=True") + ds := newDaemonSet("critical") + ds.Spec.Template.Spec = podSpec + setDaemonSetCritical(ds) + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) +} + +func setDaemonSetCritical(ds *extensions.DaemonSet) { + ds.Namespace = api.NamespaceSystem + if ds.Spec.Template.ObjectMeta.Annotations == nil { + ds.Spec.Template.ObjectMeta.Annotations = make(map[string]string) + } + ds.Spec.Template.ObjectMeta.Annotations[kubelettypes.CriticalPodAnnotationKey] = "" +} + func TestNodeShouldRunDaemonPod(t *testing.T) { cases := []struct { podsOnNode []*v1.Pod diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 07713cae589..4f1387ca085 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -855,6 +855,28 @@ func haveSame(a1, a2 []string) bool { } func GeneralPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { + var predicateFails []algorithm.PredicateFailureReason + fit, reasons, err := noncriticalPredicates(pod, meta, nodeInfo) + if err != nil { + return false, predicateFails, err + } + if !fit { + predicateFails = append(predicateFails, reasons...) + } + + fit, reasons, err = EssentialPredicates(pod, meta, nodeInfo) + if err != nil { + return false, predicateFails, err + } + if !fit { + predicateFails = append(predicateFails, reasons...) + } + + return len(predicateFails) == 0, predicateFails, nil +} + +// noncriticalPredicates are the predicates that only non-critical pods need +func noncriticalPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { var predicateFails []algorithm.PredicateFailureReason fit, reasons, err := PodFitsResources(pod, meta, nodeInfo) if err != nil { @@ -864,7 +886,13 @@ func GeneralPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.N predicateFails = append(predicateFails, reasons...) } - fit, reasons, err = PodFitsHost(pod, meta, nodeInfo) + return len(predicateFails) == 0, predicateFails, nil +} + +// EssentialPredicates are the predicates that all pods, including critical pods, need +func EssentialPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { + var predicateFails []algorithm.PredicateFailureReason + fit, reasons, err := PodFitsHost(pod, meta, nodeInfo) if err != nil { return false, predicateFails, err } @@ -872,6 +900,8 @@ func GeneralPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.N predicateFails = append(predicateFails, reasons...) } + // TODO: PodFitsHostPorts is essential for now, but kubelet should ideally + // preempt pods to free up host ports too fit, reasons, err = PodFitsHostPorts(pod, meta, nodeInfo) if err != nil { return false, predicateFails, err @@ -887,7 +917,6 @@ func GeneralPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.N if !fit { predicateFails = append(predicateFails, reasons...) } - return len(predicateFails) == 0, predicateFails, nil }