mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #42028 from janetkuo/ds-critical-pods
Automatic merge from submit-queue (batch tested with PRs 41234, 42186, 41615, 42028, 41788) Make DaemonSet respect critical pods annotation when scheduling **What this PR does / why we need it**: #41612 **Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #41612 **Special notes for your reviewer**: **Release note**: ```release-note Make DaemonSet respect critical pods annotation when scheduling. ``` cc @kubernetes/sig-apps-feature-requests @erictune @vishh @liggitt @kargakis @lukaszo @piosz @davidopp
This commit is contained in:
commit
2b2c04e685
@ -28,6 +28,8 @@ go_library(
|
||||
"//pkg/client/listers/extensions/v1beta1:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/daemon/util:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/kubelet/types:go_default_library",
|
||||
"//pkg/util/metrics:go_default_library",
|
||||
"//plugin/pkg/scheduler/algorithm:go_default_library",
|
||||
"//plugin/pkg/scheduler/algorithm/predicates:go_default_library",
|
||||
@ -40,6 +42,7 @@ go_library(
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/intstr",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
||||
"//vendor:k8s.io/apiserver/pkg/util/feature",
|
||||
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
|
||||
"//vendor:k8s.io/client-go/pkg/api/v1",
|
||||
"//vendor:k8s.io/client-go/tools/cache",
|
||||
@ -64,12 +67,14 @@ go_test(
|
||||
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
|
||||
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/kubelet/types:go_default_library",
|
||||
"//pkg/securitycontext:go_default_library",
|
||||
"//vendor:k8s.io/apimachinery/pkg/api/resource",
|
||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||
"//vendor:k8s.io/apimachinery/pkg/runtime",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/intstr",
|
||||
"//vendor:k8s.io/apiserver/pkg/storage/names",
|
||||
"//vendor:k8s.io/apiserver/pkg/util/feature",
|
||||
"//vendor:k8s.io/client-go/testing",
|
||||
"//vendor:k8s.io/client-go/tools/cache",
|
||||
"//vendor:k8s.io/client-go/tools/record",
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
clientv1 "k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
@ -45,6 +46,8 @@ import (
|
||||
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/daemon/util"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/util/metrics"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
|
||||
@ -782,6 +785,11 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
|
||||
// Returns true when a daemonset should continue running on a node if a daemonset pod is already
|
||||
// running on that node.
|
||||
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *extensions.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) {
|
||||
newPod := &v1.Pod{Spec: ds.Spec.Template.Spec, ObjectMeta: ds.Spec.Template.ObjectMeta}
|
||||
newPod.Namespace = ds.Namespace
|
||||
newPod.Spec.NodeName = node.Name
|
||||
critical := utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalCriticalPodAnnotation) && kubelettypes.IsCriticalPod(newPod)
|
||||
|
||||
// Because these bools require an && of all their required conditions, we start
|
||||
// with all bools set to true and set a bool to false if a condition is not met.
|
||||
// A bool should probably not be set to true after this line.
|
||||
@ -793,6 +801,11 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *exten
|
||||
|
||||
// TODO: Move it to the predicates
|
||||
for _, c := range node.Status.Conditions {
|
||||
if critical {
|
||||
break
|
||||
}
|
||||
// TODO: There are other node status that the DaemonSet should ideally respect too,
|
||||
// e.g. MemoryPressure, and DiskPressure
|
||||
if c.Type == v1.NodeOutOfDisk && c.Status == v1.ConditionTrue {
|
||||
// the kubelet will evict this pod if it needs to. Let kubelet
|
||||
// decide whether to continue running this pod so leave shouldContinueRunning
|
||||
@ -801,10 +814,6 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *exten
|
||||
}
|
||||
}
|
||||
|
||||
newPod := &v1.Pod{Spec: ds.Spec.Template.Spec, ObjectMeta: ds.Spec.Template.ObjectMeta}
|
||||
newPod.Namespace = ds.Namespace
|
||||
newPod.Spec.NodeName = node.Name
|
||||
|
||||
// DaemonSet pods shouldn't be deleted by NodeController in case of node problems.
|
||||
// Add infinite toleration for taint notReady:NoExecute here
|
||||
// to survive taint-based eviction enforced by NodeController
|
||||
@ -917,8 +926,9 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *exten
|
||||
// and PodToleratesNodeTaints predicate
|
||||
func daemonSetPredicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
var predicateFails []algorithm.PredicateFailureReason
|
||||
critical := utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalCriticalPodAnnotation) && kubelettypes.IsCriticalPod(pod)
|
||||
|
||||
fit, reasons, err := predicates.GeneralPredicates(pod, nil, nodeInfo)
|
||||
fit, reasons, err := predicates.PodToleratesNodeTaints(pod, nil, nodeInfo)
|
||||
if err != nil {
|
||||
return false, predicateFails, err
|
||||
}
|
||||
@ -926,7 +936,13 @@ func daemonSetPredicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool,
|
||||
predicateFails = append(predicateFails, reasons...)
|
||||
}
|
||||
|
||||
fit, reasons, err = predicates.PodToleratesNodeTaints(pod, nil, nodeInfo)
|
||||
if critical {
|
||||
// If the pod is marked as critical and support for critical pod annotations is enabled,
|
||||
// check predicates for critical pods only.
|
||||
fit, reasons, err = predicates.EssentialPredicates(pod, nil, nodeInfo)
|
||||
} else {
|
||||
fit, reasons, err = predicates.GeneralPredicates(pod, nil, nodeInfo)
|
||||
}
|
||||
if err != nil {
|
||||
return false, predicateFails, err
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apiserver/pkg/storage/names"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
@ -35,6 +36,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/securitycontext"
|
||||
)
|
||||
|
||||
@ -892,6 +894,83 @@ func setDaemonSetToleration(ds *extensions.DaemonSet, tolerations []v1.Toleratio
|
||||
ds.Spec.Template.Spec.Tolerations = tolerations
|
||||
}
|
||||
|
||||
// DaemonSet should launch a critical pod even when the node is OutOfDisk.
|
||||
func TestOutOfDiskNodeDaemonLaunchesCriticalPod(t *testing.T) {
|
||||
manager, podControl, _ := newTestController()
|
||||
|
||||
node := newNode("not-enough-disk", nil)
|
||||
node.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}}
|
||||
manager.nodeStore.Add(node)
|
||||
|
||||
// Without enabling critical pod annotation feature gate, we shouldn't create critical pod
|
||||
utilfeature.DefaultFeatureGate.Set("ExperimentalCriticalPodAnnotation=False")
|
||||
ds := newDaemonSet("critical")
|
||||
setDaemonSetCritical(ds)
|
||||
manager.dsStore.Add(ds)
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
|
||||
|
||||
// Enabling critical pod annotation feature gate should create critical pod
|
||||
utilfeature.DefaultFeatureGate.Set("ExperimentalCriticalPodAnnotation=True")
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0)
|
||||
}
|
||||
|
||||
// DaemonSet should launch a critical pod even when the node has insufficient free resource.
|
||||
func TestInsufficientCapacityNodeDaemonLaunchesCriticalPod(t *testing.T) {
|
||||
podSpec := resourcePodSpec("too-much-mem", "75M", "75m")
|
||||
manager, podControl, _ := newTestController()
|
||||
node := newNode("too-much-mem", nil)
|
||||
node.Status.Allocatable = allocatableResources("100M", "200m")
|
||||
manager.nodeStore.Add(node)
|
||||
manager.podStore.Add(&v1.Pod{
|
||||
Spec: podSpec,
|
||||
})
|
||||
|
||||
// Without enabling critical pod annotation feature gate, we shouldn't create critical pod
|
||||
utilfeature.DefaultFeatureGate.Set("ExperimentalCriticalPodAnnotation=False")
|
||||
ds := newDaemonSet("critical")
|
||||
ds.Spec.Template.Spec = podSpec
|
||||
setDaemonSetCritical(ds)
|
||||
manager.dsStore.Add(ds)
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
|
||||
|
||||
// Enabling critical pod annotation feature gate should create critical pod
|
||||
utilfeature.DefaultFeatureGate.Set("ExperimentalCriticalPodAnnotation=True")
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0)
|
||||
}
|
||||
|
||||
// DaemonSets should NOT launch a critical pod when there are port conflicts.
|
||||
func TestPortConflictNodeDaemonDoesNotLaunchCriticalPod(t *testing.T) {
|
||||
podSpec := v1.PodSpec{
|
||||
NodeName: "port-conflict",
|
||||
Containers: []v1.Container{{
|
||||
Ports: []v1.ContainerPort{{
|
||||
HostPort: 666,
|
||||
}},
|
||||
}},
|
||||
}
|
||||
manager, podControl, _ := newTestController()
|
||||
node := newNode("port-conflict", nil)
|
||||
manager.nodeStore.Add(node)
|
||||
manager.podStore.Add(&v1.Pod{
|
||||
Spec: podSpec,
|
||||
})
|
||||
|
||||
utilfeature.DefaultFeatureGate.Set("ExperimentalCriticalPodAnnotation=True")
|
||||
ds := newDaemonSet("critical")
|
||||
ds.Spec.Template.Spec = podSpec
|
||||
setDaemonSetCritical(ds)
|
||||
manager.dsStore.Add(ds)
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
|
||||
}
|
||||
|
||||
func setDaemonSetCritical(ds *extensions.DaemonSet) {
|
||||
ds.Namespace = api.NamespaceSystem
|
||||
if ds.Spec.Template.ObjectMeta.Annotations == nil {
|
||||
ds.Spec.Template.ObjectMeta.Annotations = make(map[string]string)
|
||||
}
|
||||
ds.Spec.Template.ObjectMeta.Annotations[kubelettypes.CriticalPodAnnotationKey] = ""
|
||||
}
|
||||
|
||||
func TestNodeShouldRunDaemonPod(t *testing.T) {
|
||||
cases := []struct {
|
||||
podsOnNode []*v1.Pod
|
||||
|
@ -855,6 +855,28 @@ func haveSame(a1, a2 []string) bool {
|
||||
}
|
||||
|
||||
func GeneralPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
var predicateFails []algorithm.PredicateFailureReason
|
||||
fit, reasons, err := noncriticalPredicates(pod, meta, nodeInfo)
|
||||
if err != nil {
|
||||
return false, predicateFails, err
|
||||
}
|
||||
if !fit {
|
||||
predicateFails = append(predicateFails, reasons...)
|
||||
}
|
||||
|
||||
fit, reasons, err = EssentialPredicates(pod, meta, nodeInfo)
|
||||
if err != nil {
|
||||
return false, predicateFails, err
|
||||
}
|
||||
if !fit {
|
||||
predicateFails = append(predicateFails, reasons...)
|
||||
}
|
||||
|
||||
return len(predicateFails) == 0, predicateFails, nil
|
||||
}
|
||||
|
||||
// noncriticalPredicates are the predicates that only non-critical pods need
|
||||
func noncriticalPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
var predicateFails []algorithm.PredicateFailureReason
|
||||
fit, reasons, err := PodFitsResources(pod, meta, nodeInfo)
|
||||
if err != nil {
|
||||
@ -864,7 +886,13 @@ func GeneralPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.N
|
||||
predicateFails = append(predicateFails, reasons...)
|
||||
}
|
||||
|
||||
fit, reasons, err = PodFitsHost(pod, meta, nodeInfo)
|
||||
return len(predicateFails) == 0, predicateFails, nil
|
||||
}
|
||||
|
||||
// EssentialPredicates are the predicates that all pods, including critical pods, need
|
||||
func EssentialPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
var predicateFails []algorithm.PredicateFailureReason
|
||||
fit, reasons, err := PodFitsHost(pod, meta, nodeInfo)
|
||||
if err != nil {
|
||||
return false, predicateFails, err
|
||||
}
|
||||
@ -872,6 +900,8 @@ func GeneralPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.N
|
||||
predicateFails = append(predicateFails, reasons...)
|
||||
}
|
||||
|
||||
// TODO: PodFitsHostPorts is essential for now, but kubelet should ideally
|
||||
// preempt pods to free up host ports too
|
||||
fit, reasons, err = PodFitsHostPorts(pod, meta, nodeInfo)
|
||||
if err != nil {
|
||||
return false, predicateFails, err
|
||||
@ -887,7 +917,6 @@ func GeneralPredicates(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.N
|
||||
if !fit {
|
||||
predicateFails = append(predicateFails, reasons...)
|
||||
}
|
||||
|
||||
return len(predicateFails) == 0, predicateFails, nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user