mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
feat: graduate ScheduleDaemonSetPods to GA
This commit is contained in:
parent
b084336460
commit
35d772e354
@ -18,8 +18,6 @@ go_library(
|
|||||||
"//pkg/api/v1/pod:go_default_library",
|
"//pkg/api/v1/pod:go_default_library",
|
||||||
"//pkg/controller:go_default_library",
|
"//pkg/controller:go_default_library",
|
||||||
"//pkg/controller/daemon/util:go_default_library",
|
"//pkg/controller/daemon/util:go_default_library",
|
||||||
"//pkg/features:go_default_library",
|
|
||||||
"//pkg/kubelet/types:go_default_library",
|
|
||||||
"//pkg/scheduler/algorithm/predicates:go_default_library",
|
"//pkg/scheduler/algorithm/predicates:go_default_library",
|
||||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||||
"//pkg/util/labels:go_default_library",
|
"//pkg/util/labels:go_default_library",
|
||||||
@ -36,7 +34,6 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
|
||||||
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
|
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
|
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
|
@ -26,7 +26,7 @@ import (
|
|||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
|
|
||||||
apps "k8s.io/api/apps/v1"
|
apps "k8s.io/api/apps/v1"
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@ -35,7 +35,6 @@ import (
|
|||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
||||||
appsinformers "k8s.io/client-go/informers/apps/v1"
|
appsinformers "k8s.io/client-go/informers/apps/v1"
|
||||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
@ -52,8 +51,6 @@ import (
|
|||||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/controller/daemon/util"
|
"k8s.io/kubernetes/pkg/controller/daemon/util"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
|
||||||
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||||
"k8s.io/utils/integer"
|
"k8s.io/utils/integer"
|
||||||
@ -973,9 +970,7 @@ func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node,
|
|||||||
|
|
||||||
// Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler.
|
// Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler.
|
||||||
// If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController.
|
// If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController.
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
|
|
||||||
podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
|
podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
|
||||||
}
|
|
||||||
|
|
||||||
// Label new pods using the hash label value of the current history when creating them
|
// Label new pods using the hash label value of the current history when creating them
|
||||||
if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
|
if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
|
||||||
@ -1033,25 +1028,16 @@ func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nod
|
|||||||
for i := pos; i < pos+batchSize; i++ {
|
for i := pos; i < pos+batchSize; i++ {
|
||||||
go func(ix int) {
|
go func(ix int) {
|
||||||
defer createWait.Done()
|
defer createWait.Done()
|
||||||
var err error
|
|
||||||
|
|
||||||
podTemplate := template.DeepCopy()
|
podTemplate := template.DeepCopy()
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
|
|
||||||
// The pod's NodeAffinity will be updated to make sure the Pod is bound
|
// The pod's NodeAffinity will be updated to make sure the Pod is bound
|
||||||
// to the target node by default scheduler. It is safe to do so because there
|
// to the target node by default scheduler. It is safe to do so because there
|
||||||
// should be no conflicting node affinity with the target node.
|
// should be no conflicting node affinity with the target node.
|
||||||
podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
|
podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
|
||||||
podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
|
podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
|
||||||
|
|
||||||
err = dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
|
err := dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
|
||||||
ds, metav1.NewControllerRef(ds, controllerKind))
|
ds, metav1.NewControllerRef(ds, controllerKind))
|
||||||
} else {
|
|
||||||
// If pod is scheduled by DaemonSetController, set its '.spec.scheduleName'.
|
|
||||||
podTemplate.Spec.SchedulerName = "kubernetes.io/daemonset-controller"
|
|
||||||
|
|
||||||
err = dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, podTemplate,
|
|
||||||
ds, metav1.NewControllerRef(ds, controllerKind))
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil && errors.IsTimeout(err) {
|
if err != nil && errors.IsTimeout(err) {
|
||||||
// Pod is created but its initialization has timed out.
|
// Pod is created but its initialization has timed out.
|
||||||
@ -1355,14 +1341,10 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.
|
|||||||
// TODO(k82cn): When 'ScheduleDaemonSetPods' upgrade to beta or GA, remove unnecessary check on failure reason,
|
// TODO(k82cn): When 'ScheduleDaemonSetPods' upgrade to beta or GA, remove unnecessary check on failure reason,
|
||||||
// e.g. InsufficientResourceError; and simplify "wantToRun, shouldSchedule, shouldContinueRunning"
|
// e.g. InsufficientResourceError; and simplify "wantToRun, shouldSchedule, shouldContinueRunning"
|
||||||
// into one result, e.g. selectedNode.
|
// into one result, e.g. selectedNode.
|
||||||
var insufficientResourceErr error
|
|
||||||
for _, r := range reasons {
|
for _, r := range reasons {
|
||||||
klog.V(4).Infof("DaemonSet Predicates failed on node %s for ds '%s/%s' for reason: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason())
|
klog.V(4).Infof("DaemonSet Predicates failed on node %s for ds '%s/%s' for reason: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason())
|
||||||
switch reason := r.(type) {
|
switch reason := r.(type) {
|
||||||
case *predicates.InsufficientResourceError:
|
|
||||||
insufficientResourceErr = reason
|
|
||||||
case *predicates.PredicateFailureError:
|
case *predicates.PredicateFailureError:
|
||||||
var emitEvent bool
|
|
||||||
// we try to partition predicates into two partitions here: intentional on the part of the operator and not.
|
// we try to partition predicates into two partitions here: intentional on the part of the operator and not.
|
||||||
switch reason {
|
switch reason {
|
||||||
// intentional
|
// intentional
|
||||||
@ -1384,18 +1366,6 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.
|
|||||||
return false, false, false, nil
|
return false, false, false, nil
|
||||||
}
|
}
|
||||||
wantToRun, shouldSchedule = false, false
|
wantToRun, shouldSchedule = false, false
|
||||||
// unintentional
|
|
||||||
case
|
|
||||||
predicates.ErrDiskConflict,
|
|
||||||
predicates.ErrVolumeZoneConflict,
|
|
||||||
predicates.ErrMaxVolumeCountExceeded,
|
|
||||||
predicates.ErrNodeUnderMemoryPressure,
|
|
||||||
predicates.ErrNodeUnderDiskPressure:
|
|
||||||
// wantToRun and shouldContinueRunning are likely true here. They are
|
|
||||||
// absolutely true at the time of writing the comment. See first comment
|
|
||||||
// of this method.
|
|
||||||
shouldSchedule = false
|
|
||||||
emitEvent = true
|
|
||||||
// unexpected
|
// unexpected
|
||||||
case
|
case
|
||||||
predicates.ErrPodAffinityNotMatch,
|
predicates.ErrPodAffinityNotMatch,
|
||||||
@ -1405,19 +1375,10 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.
|
|||||||
default:
|
default:
|
||||||
klog.V(4).Infof("unknown predicate failure reason: %s", reason.GetReason())
|
klog.V(4).Infof("unknown predicate failure reason: %s", reason.GetReason())
|
||||||
wantToRun, shouldSchedule, shouldContinueRunning = false, false, false
|
wantToRun, shouldSchedule, shouldContinueRunning = false, false, false
|
||||||
emitEvent = true
|
|
||||||
}
|
|
||||||
if emitEvent {
|
|
||||||
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, reason.GetReason())
|
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, reason.GetReason())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// only emit this event if insufficient resource is the only thing
|
|
||||||
// preventing the daemon pod from scheduling
|
|
||||||
if shouldSchedule && insufficientResourceErr != nil {
|
|
||||||
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, insufficientResourceErr.Error())
|
|
||||||
shouldSchedule = false
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1471,8 +1432,6 @@ func checkNodeFitness(pod *v1.Pod, meta predicates.PredicateMetadata, nodeInfo *
|
|||||||
func Predicates(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {
|
func Predicates(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {
|
||||||
var predicateFails []predicates.PredicateFailureReason
|
var predicateFails []predicates.PredicateFailureReason
|
||||||
|
|
||||||
// If ScheduleDaemonSetPods is enabled, only check nodeSelector, nodeAffinity and toleration/taint match.
|
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
|
|
||||||
fit, reasons, err := checkNodeFitness(pod, nil, nodeInfo)
|
fit, reasons, err := checkNodeFitness(pod, nil, nodeInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, predicateFails, err
|
return false, predicateFails, err
|
||||||
@ -1484,32 +1443,6 @@ func Predicates(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []pred
|
|||||||
return len(predicateFails) == 0, predicateFails, nil
|
return len(predicateFails) == 0, predicateFails, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
critical := kubelettypes.IsCriticalPod(pod)
|
|
||||||
|
|
||||||
fit, reasons, err := predicates.PodToleratesNodeTaints(pod, nil, nodeInfo)
|
|
||||||
if err != nil {
|
|
||||||
return false, predicateFails, err
|
|
||||||
}
|
|
||||||
if !fit {
|
|
||||||
predicateFails = append(predicateFails, reasons...)
|
|
||||||
}
|
|
||||||
if critical {
|
|
||||||
// If the pod is marked as critical and support for critical pod annotations is enabled,
|
|
||||||
// check predicates for critical pods only.
|
|
||||||
fit, reasons, err = predicates.EssentialPredicates(pod, nil, nodeInfo)
|
|
||||||
} else {
|
|
||||||
fit, reasons, err = predicates.GeneralPredicates(pod, nil, nodeInfo)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return false, predicateFails, err
|
|
||||||
}
|
|
||||||
if !fit {
|
|
||||||
predicateFails = append(predicateFails, reasons...)
|
|
||||||
}
|
|
||||||
|
|
||||||
return len(predicateFails) == 0, predicateFails, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type podByCreationTimestampAndPhase []*v1.Pod
|
type podByCreationTimestampAndPhase []*v1.Pod
|
||||||
|
|
||||||
func (o podByCreationTimestampAndPhase) Len() int { return len(o) }
|
func (o podByCreationTimestampAndPhase) Len() int { return len(o) }
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -39,7 +39,6 @@ go_test(
|
|||||||
srcs = ["daemonset_util_test.go"],
|
srcs = ["daemonset_util_test.go"],
|
||||||
embed = [":go_default_library"],
|
embed = [":go_default_library"],
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/features:go_default_library",
|
|
||||||
"//pkg/scheduler/api:go_default_library",
|
"//pkg/scheduler/api:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/extensions/v1beta1:go_default_library",
|
"//staging/src/k8s.io/api/extensions/v1beta1:go_default_library",
|
||||||
|
@ -21,7 +21,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
apps "k8s.io/api/apps/v1"
|
apps "k8s.io/api/apps/v1"
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
extensions "k8s.io/api/extensions/v1beta1"
|
extensions "k8s.io/api/extensions/v1beta1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
@ -204,7 +204,7 @@ func GetTargetNodeName(pod *v1.Pod) (string, error) {
|
|||||||
return pod.Spec.NodeName, nil
|
return pod.Spec.NodeName, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// If ScheduleDaemonSetPods was enabled before, retrieve node name of unscheduled pods from NodeAffinity
|
// Retrieve node name of unscheduled pods from NodeAffinity
|
||||||
if pod.Spec.Affinity == nil ||
|
if pod.Spec.Affinity == nil ||
|
||||||
pod.Spec.Affinity.NodeAffinity == nil ||
|
pod.Spec.Affinity.NodeAffinity == nil ||
|
||||||
pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
|
pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
|
||||||
|
@ -21,13 +21,12 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
extensions "k8s.io/api/extensions/v1beta1"
|
extensions "k8s.io/api/extensions/v1beta1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/component-base/featuregate"
|
"k8s.io/component-base/featuregate"
|
||||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
|
||||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||||
utilpointer "k8s.io/utils/pointer"
|
utilpointer "k8s.io/utils/pointer"
|
||||||
)
|
)
|
||||||
@ -586,5 +585,5 @@ func TestGetTargetNodeName(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
forEachFeatureGate(t, testFun, features.ScheduleDaemonSetPods)
|
forEachFeatureGate(t, testFun)
|
||||||
}
|
}
|
||||||
|
@ -241,6 +241,7 @@ const (
|
|||||||
|
|
||||||
// owner: @k82cn
|
// owner: @k82cn
|
||||||
// beta: v1.12
|
// beta: v1.12
|
||||||
|
// GA: v1.17
|
||||||
//
|
//
|
||||||
// Schedule DaemonSet Pods by default scheduler instead of DaemonSet controller
|
// Schedule DaemonSet Pods by default scheduler instead of DaemonSet controller
|
||||||
ScheduleDaemonSetPods featuregate.Feature = "ScheduleDaemonSetPods"
|
ScheduleDaemonSetPods featuregate.Feature = "ScheduleDaemonSetPods"
|
||||||
@ -534,7 +535,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
|||||||
SupportPodPidsLimit: {Default: true, PreRelease: featuregate.Beta},
|
SupportPodPidsLimit: {Default: true, PreRelease: featuregate.Beta},
|
||||||
SupportNodePidsLimit: {Default: true, PreRelease: featuregate.Beta},
|
SupportNodePidsLimit: {Default: true, PreRelease: featuregate.Beta},
|
||||||
HyperVContainer: {Default: false, PreRelease: featuregate.Alpha},
|
HyperVContainer: {Default: false, PreRelease: featuregate.Alpha},
|
||||||
ScheduleDaemonSetPods: {Default: true, PreRelease: featuregate.Beta},
|
ScheduleDaemonSetPods: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.18
|
||||||
TokenRequest: {Default: true, PreRelease: featuregate.Beta},
|
TokenRequest: {Default: true, PreRelease: featuregate.Beta},
|
||||||
TokenRequestProjection: {Default: true, PreRelease: featuregate.Beta},
|
TokenRequestProjection: {Default: true, PreRelease: featuregate.Beta},
|
||||||
BoundServiceAccountTokenVolume: {Default: false, PreRelease: featuregate.Alpha},
|
BoundServiceAccountTokenVolume: {Default: false, PreRelease: featuregate.Alpha},
|
||||||
|
@ -18,7 +18,6 @@ go_test(
|
|||||||
"//pkg/api/v1/pod:go_default_library",
|
"//pkg/api/v1/pod:go_default_library",
|
||||||
"//pkg/controller:go_default_library",
|
"//pkg/controller:go_default_library",
|
||||||
"//pkg/controller/daemon:go_default_library",
|
"//pkg/controller/daemon:go_default_library",
|
||||||
"//pkg/features:go_default_library",
|
|
||||||
"//pkg/scheduler:go_default_library",
|
"//pkg/scheduler:go_default_library",
|
||||||
"//pkg/scheduler/algorithmprovider:go_default_library",
|
"//pkg/scheduler/algorithmprovider:go_default_library",
|
||||||
"//pkg/scheduler/api:go_default_library",
|
"//pkg/scheduler/api:go_default_library",
|
||||||
@ -32,7 +31,6 @@ go_test(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
|
||||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes/typed/apps/v1:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes/typed/apps/v1:go_default_library",
|
||||||
@ -42,8 +40,6 @@ go_test(
|
|||||||
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
|
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
|
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
|
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
|
||||||
"//staging/src/k8s.io/component-base/featuregate:go_default_library",
|
|
||||||
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
|
||||||
"//test/integration/framework:go_default_library",
|
"//test/integration/framework:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -31,7 +31,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
"k8s.io/apimachinery/pkg/util/uuid"
|
"k8s.io/apimachinery/pkg/util/uuid"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
appstyped "k8s.io/client-go/kubernetes/typed/apps/v1"
|
appstyped "k8s.io/client-go/kubernetes/typed/apps/v1"
|
||||||
@ -41,13 +40,10 @@ import (
|
|||||||
"k8s.io/client-go/tools/events"
|
"k8s.io/client-go/tools/events"
|
||||||
"k8s.io/client-go/util/flowcontrol"
|
"k8s.io/client-go/util/flowcontrol"
|
||||||
"k8s.io/client-go/util/retry"
|
"k8s.io/client-go/util/retry"
|
||||||
"k8s.io/component-base/featuregate"
|
|
||||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
|
||||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/controller/daemon"
|
"k8s.io/kubernetes/pkg/controller/daemon"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler"
|
"k8s.io/kubernetes/pkg/scheduler"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
||||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||||
@ -90,12 +86,6 @@ func setupScheduler(
|
|||||||
cs clientset.Interface,
|
cs clientset.Interface,
|
||||||
informerFactory informers.SharedInformerFactory,
|
informerFactory informers.SharedInformerFactory,
|
||||||
) (restoreFeatureGates func()) {
|
) (restoreFeatureGates func()) {
|
||||||
restoreFeatureGates = func() {}
|
|
||||||
// If ScheduleDaemonSetPods is disabled, do not start scheduler.
|
|
||||||
if !utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enable Features.
|
// Enable Features.
|
||||||
restoreFeatureGates = algorithmprovider.ApplyFeatureGates()
|
restoreFeatureGates = algorithmprovider.ApplyFeatureGates()
|
||||||
|
|
||||||
@ -221,12 +211,6 @@ func updateStrategies() []*apps.DaemonSetUpdateStrategy {
|
|||||||
return []*apps.DaemonSetUpdateStrategy{newOnDeleteStrategy(), newRollbackStrategy()}
|
return []*apps.DaemonSetUpdateStrategy{newOnDeleteStrategy(), newRollbackStrategy()}
|
||||||
}
|
}
|
||||||
|
|
||||||
func featureGates() []featuregate.Feature {
|
|
||||||
return []featuregate.Feature{
|
|
||||||
features.ScheduleDaemonSetPods,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func allocatableResources(memory, cpu string) v1.ResourceList {
|
func allocatableResources(memory, cpu string) v1.ResourceList {
|
||||||
return v1.ResourceList{
|
return v1.ResourceList{
|
||||||
v1.ResourceMemory: resource.MustParse(memory),
|
v1.ResourceMemory: resource.MustParse(memory),
|
||||||
@ -427,31 +411,6 @@ func validateDaemonSetStatus(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateFailedPlacementEvent(eventClient corev1client.EventInterface, t *testing.T) {
|
|
||||||
if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
|
|
||||||
eventList, err := eventClient.List(metav1.ListOptions{})
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
if len(eventList.Items) == 0 {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
if len(eventList.Items) > 1 {
|
|
||||||
t.Errorf("Expected 1 event got %d", len(eventList.Items))
|
|
||||||
}
|
|
||||||
event := eventList.Items[0]
|
|
||||||
if event.Type != v1.EventTypeWarning {
|
|
||||||
t.Errorf("Event type expected %s got %s", v1.EventTypeWarning, event.Type)
|
|
||||||
}
|
|
||||||
if event.Reason != daemon.FailedPlacementReason {
|
|
||||||
t.Errorf("Event reason expected %s got %s", daemon.FailedPlacementReason, event.Reason)
|
|
||||||
}
|
|
||||||
return true, nil
|
|
||||||
}); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func updateDS(t *testing.T, dsClient appstyped.DaemonSetInterface, dsName string, updateFunc func(*apps.DaemonSet)) *apps.DaemonSet {
|
func updateDS(t *testing.T, dsClient appstyped.DaemonSetInterface, dsName string, updateFunc func(*apps.DaemonSet)) *apps.DaemonSet {
|
||||||
var ds *apps.DaemonSet
|
var ds *apps.DaemonSet
|
||||||
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||||
@ -468,17 +427,6 @@ func updateDS(t *testing.T, dsClient appstyped.DaemonSetInterface, dsName string
|
|||||||
return ds
|
return ds
|
||||||
}
|
}
|
||||||
|
|
||||||
func forEachFeatureGate(t *testing.T, tf func(t *testing.T)) {
|
|
||||||
for _, fg := range featureGates() {
|
|
||||||
for _, f := range []bool{true, false} {
|
|
||||||
func() {
|
|
||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, fg, f)()
|
|
||||||
t.Run(fmt.Sprintf("%v (%t)", fg, f), tf)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy)) {
|
func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy)) {
|
||||||
for _, strategy := range updateStrategies() {
|
for _, strategy := range updateStrategies() {
|
||||||
t.Run(fmt.Sprintf("%s (%v)", t.Name(), strategy),
|
t.Run(fmt.Sprintf("%s (%v)", t.Name(), strategy),
|
||||||
@ -487,7 +435,6 @@ func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSe
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestOneNodeDaemonLaunchesPod(t *testing.T) {
|
func TestOneNodeDaemonLaunchesPod(t *testing.T) {
|
||||||
forEachFeatureGate(t, func(t *testing.T) {
|
|
||||||
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
||||||
server, closeFn, dc, informers, clientset := setup(t)
|
server, closeFn, dc, informers, clientset := setup(t)
|
||||||
defer closeFn()
|
defer closeFn()
|
||||||
@ -524,11 +471,9 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) {
|
|||||||
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t)
|
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 1, t)
|
||||||
validateDaemonSetStatus(dsClient, ds.Name, 1, t)
|
validateDaemonSetStatus(dsClient, ds.Name, 1, t)
|
||||||
})
|
})
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
|
func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
|
||||||
forEachFeatureGate(t, func(t *testing.T) {
|
|
||||||
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
||||||
server, closeFn, dc, informers, clientset := setup(t)
|
server, closeFn, dc, informers, clientset := setup(t)
|
||||||
defer closeFn()
|
defer closeFn()
|
||||||
@ -562,11 +507,9 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
|
|||||||
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 5, t)
|
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 5, t)
|
||||||
validateDaemonSetStatus(dsClient, ds.Name, 5, t)
|
validateDaemonSetStatus(dsClient, ds.Name, 5, t)
|
||||||
})
|
})
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) {
|
func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) {
|
||||||
forEachFeatureGate(t, func(t *testing.T) {
|
|
||||||
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
||||||
server, closeFn, dc, informers, clientset := setup(t)
|
server, closeFn, dc, informers, clientset := setup(t)
|
||||||
defer closeFn()
|
defer closeFn()
|
||||||
@ -633,7 +576,6 @@ func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) {
|
|||||||
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 3, t)
|
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 3, t)
|
||||||
validateDaemonSetStatus(dsClient, ds.Name, 3, t)
|
validateDaemonSetStatus(dsClient, ds.Name, 3, t)
|
||||||
})
|
})
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
|
func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
|
||||||
@ -680,52 +622,10 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// When ScheduleDaemonSetPods is disabled, DaemonSets should not launch onto nodes with insufficient capacity.
|
// TestInsufficientCapacityNodeDaemonSetCreateButNotLaunchPod tests thaat the DaemonSet should create
|
||||||
// Look for TestInsufficientCapacityNodeWhenScheduleDaemonSetPodsEnabled, we don't need this test anymore.
|
// Pods for all the nodes regardless of available resource on the nodes, and kube-scheduler should
|
||||||
func TestInsufficientCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) {
|
// not schedule Pods onto the nodes with insufficient resource.
|
||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ScheduleDaemonSetPods, false)()
|
func TestInsufficientCapacityNode(t *testing.T) {
|
||||||
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
|
||||||
server, closeFn, dc, informers, clientset := setup(t)
|
|
||||||
defer closeFn()
|
|
||||||
ns := framework.CreateTestingNamespace("insufficient-capacity", server, t)
|
|
||||||
defer framework.DeleteTestingNamespace(ns, server, t)
|
|
||||||
|
|
||||||
dsClient := clientset.AppsV1().DaemonSets(ns.Name)
|
|
||||||
nodeClient := clientset.CoreV1().Nodes()
|
|
||||||
eventClient := clientset.CoreV1().Events(ns.Namespace)
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
defer close(stopCh)
|
|
||||||
|
|
||||||
informers.Start(stopCh)
|
|
||||||
go dc.Run(5, stopCh)
|
|
||||||
|
|
||||||
ds := newDaemonSet("foo", ns.Name)
|
|
||||||
ds.Spec.Template.Spec = resourcePodSpec("node-with-limited-memory", "120M", "75m")
|
|
||||||
ds.Spec.UpdateStrategy = *strategy
|
|
||||||
_, err := dsClient.Create(ds)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create DaemonSet: %v", err)
|
|
||||||
}
|
|
||||||
defer cleanupDaemonSets(t, clientset, ds)
|
|
||||||
|
|
||||||
node := newNode("node-with-limited-memory", nil)
|
|
||||||
node.Status.Allocatable = allocatableResources("100M", "200m")
|
|
||||||
_, err = nodeClient.Create(node)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create node: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
validateFailedPlacementEvent(eventClient, t)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestInsufficientCapacityNodeDaemonSetCreateButNotLaunchPod tests that when "ScheduleDaemonSetPods"
|
|
||||||
// feature is enabled, the DaemonSet should create Pods for all the nodes regardless of available resource
|
|
||||||
// on the nodes, and kube-scheduler should not schedule Pods onto the nodes with insufficient resource.
|
|
||||||
func TestInsufficientCapacityNodeWhenScheduleDaemonSetPodsEnabled(t *testing.T) {
|
|
||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ScheduleDaemonSetPods, true)()
|
|
||||||
|
|
||||||
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
||||||
server, closeFn, dc, informers, clientset := setup(t)
|
server, closeFn, dc, informers, clientset := setup(t)
|
||||||
defer closeFn()
|
defer closeFn()
|
||||||
@ -782,8 +682,7 @@ func TestInsufficientCapacityNodeWhenScheduleDaemonSetPodsEnabled(t *testing.T)
|
|||||||
t.Fatalf("Failed to create node: %v", err)
|
t.Fatalf("Failed to create node: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// When ScheduleDaemonSetPods enabled, 2 pods are created. But only one
|
// 2 pods are created. But only one of two Pods is scheduled by default scheduler.
|
||||||
// of two Pods is scheduled by default scheduler.
|
|
||||||
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
|
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
|
||||||
validateDaemonSetStatus(dsClient, ds.Name, 1, t)
|
validateDaemonSetStatus(dsClient, ds.Name, 1, t)
|
||||||
})
|
})
|
||||||
@ -898,10 +797,8 @@ func TestLaunchWithHashCollision(t *testing.T) {
|
|||||||
validateDaemonSetCollisionCount(dsClient, ds.Name, orgCollisionCount+1, t)
|
validateDaemonSetCollisionCount(dsClient, ds.Name, orgCollisionCount+1, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestTaintedNode tests that no matter "ScheduleDaemonSetPods" feature is enabled or not
|
// TestTaintedNode tests tainted node isn't expected to have pod scheduled
|
||||||
// tainted node isn't expected to have pod scheduled
|
|
||||||
func TestTaintedNode(t *testing.T) {
|
func TestTaintedNode(t *testing.T) {
|
||||||
forEachFeatureGate(t, func(t *testing.T) {
|
|
||||||
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
||||||
server, closeFn, dc, informers, clientset := setup(t)
|
server, closeFn, dc, informers, clientset := setup(t)
|
||||||
defer closeFn()
|
defer closeFn()
|
||||||
@ -962,13 +859,11 @@ func TestTaintedNode(t *testing.T) {
|
|||||||
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
|
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
|
||||||
validateDaemonSetStatus(dsClient, ds.Name, 2, t)
|
validateDaemonSetStatus(dsClient, ds.Name, 2, t)
|
||||||
})
|
})
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestUnschedulableNodeDaemonDoesLaunchPod tests that the DaemonSet Pods can still be scheduled
|
// TestUnschedulableNodeDaemonDoesLaunchPod tests that the DaemonSet Pods can still be scheduled
|
||||||
// to the Unschedulable nodes.
|
// to the Unschedulable nodes.
|
||||||
func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) {
|
func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) {
|
||||||
forEachFeatureGate(t, func(t *testing.T) {
|
|
||||||
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
|
||||||
server, closeFn, dc, informers, clientset := setup(t)
|
server, closeFn, dc, informers, clientset := setup(t)
|
||||||
defer closeFn()
|
defer closeFn()
|
||||||
@ -1035,5 +930,4 @@ func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) {
|
|||||||
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
|
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 2, t)
|
||||||
validateDaemonSetStatus(dsClient, ds.Name, 2, t)
|
validateDaemonSetStatus(dsClient, ds.Name, 2, t)
|
||||||
})
|
})
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user