From 7b6293b6b6a5662fc37f440e839cf5da8b96e935 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Fri, 28 Oct 2022 15:05:46 -0400 Subject: [PATCH] APIs, Validation and condition enforcements - New API field .spec.schedulingGates - Validation and drop disabled fields - Disallow binding a Pod carrying non-nil schedulingGates - Disallow creating a Pod with non-nil nodeName and non-nil schedulingGates - Adds a {type:PodScheduled, reason:WaitingForGates} condition if necessary - New literal SchedulingGated in the STATUS column of `k get pod` --- pkg/api/pod/util.go | 13 ++ pkg/api/pod/util_test.go | 82 +++++++ pkg/apis/core/types.go | 28 ++- pkg/apis/core/validation/validation.go | 68 +++++- pkg/apis/core/validation/validation_test.go | 203 +++++++++++++++++- pkg/features/kube_features.go | 9 + pkg/printers/internalversion/printers.go | 7 + pkg/printers/internalversion/printers_test.go | 18 ++ pkg/registry/core/pod/storage/storage.go | 6 + pkg/registry/core/pod/storage/storage_test.go | 75 +++++++ pkg/registry/core/pod/strategy.go | 26 +++ pkg/registry/core/pod/strategy_test.go | 69 ++++++ staging/src/k8s.io/api/core/v1/types.go | 29 ++- test/e2e/framework/pod/wait_test.go | 1 + 14 files changed, 613 insertions(+), 21 deletions(-) diff --git a/pkg/api/pod/util.go b/pkg/api/pod/util.go index 8f1a63bb719..1bdbd4523eb 100644 --- a/pkg/api/pod/util.go +++ b/pkg/api/pod/util.go @@ -537,6 +537,11 @@ func dropDisabledFields( } } + // If the feature is disabled and not in use, drop the schedulingGates field. + if !utilfeature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness) && !schedulingGatesInUse(oldPodSpec) { + podSpec.SchedulingGates = nil + } + dropDisabledProcMountField(podSpec, oldPodSpec) dropDisabledTopologySpreadConstraintsFields(podSpec, oldPodSpec) @@ -719,6 +724,14 @@ func probeGracePeriodInUse(podSpec *api.PodSpec) bool { return inUse } +// schedulingGatesInUse returns true if the pod spec is non-nil and it has SchedulingGates field set. +func schedulingGatesInUse(podSpec *api.PodSpec) bool { + if podSpec == nil { + return false + } + return len(podSpec.SchedulingGates) != 0 +} + // SeccompAnnotationForField takes a pod seccomp profile field and returns the // converted annotation value func SeccompAnnotationForField(field *api.SeccompProfile) string { diff --git a/pkg/api/pod/util_test.go b/pkg/api/pod/util_test.go index 15ee5d5705d..1dfef8aa245 100644 --- a/pkg/api/pod/util_test.go +++ b/pkg/api/pod/util_test.go @@ -1935,3 +1935,85 @@ func TestDropHostUsers(t *testing.T) { } } + +func TestDropSchedulingGates(t *testing.T) { + podWithSchedulingGates := func() *api.Pod { + return &api.Pod{ + Spec: api.PodSpec{ + SchedulingGates: []api.PodSchedulingGate{ + {Name: "foo"}, + {Name: "bar"}, + }, + }, + } + } + podWithoutSchedulingGates := func() *api.Pod { return &api.Pod{} } + + podInfo := []struct { + description string + hasSchedulingGatesField bool + pod func() *api.Pod + }{ + { + description: "has SchedulingGates field", + hasSchedulingGatesField: true, + pod: podWithSchedulingGates, + }, + { + description: "does not have SchedulingGates field", + hasSchedulingGatesField: false, + pod: podWithoutSchedulingGates, + }, + { + description: "is nil", + hasSchedulingGatesField: false, + pod: func() *api.Pod { return nil }, + }, + } + + for _, enabled := range []bool{true, false} { + for _, oldPodInfo := range podInfo { + for _, newPodInfo := range podInfo { + oldPodHasSchedulingGates, oldPod := oldPodInfo.hasSchedulingGatesField, oldPodInfo.pod() + newPodHasSchedulingGates, newPod := newPodInfo.hasSchedulingGatesField, newPodInfo.pod() + if newPod == nil { + continue + } + + t.Run(fmt.Sprintf("feature enabled=%v, old pod %v, new pod %v", enabled, oldPodInfo.description, newPodInfo.description), func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodSchedulingReadiness, enabled)() + var oldPodSpec *api.PodSpec + if oldPod != nil { + oldPodSpec = &oldPod.Spec + } + dropDisabledFields(&newPod.Spec, nil, oldPodSpec, nil) + // Old Pod should never be changed. + if diff := cmp.Diff(oldPod, oldPodInfo.pod()); diff != "" { + t.Errorf("old pod changed: %v", diff) + } + switch { + case enabled || oldPodHasSchedulingGates: + // New Pod should not be changed if the feature is enabled, or if the old Pod had schedulingGates. + if diff := cmp.Diff(newPod, newPodInfo.pod()); diff != "" { + t.Errorf("new pod changed: %v", diff) + } + case newPodHasSchedulingGates: + // New Pod should be changed. + if reflect.DeepEqual(newPod, newPodInfo.pod()) { + t.Errorf("new pod was not changed") + } + // New Pod should not have SchedulingGates field. + if diff := cmp.Diff(newPod, podWithoutSchedulingGates()); diff != "" { + t.Errorf("new pod has SchedulingGates field: %v", diff) + } + default: + // New pod should not need to be changed. + if diff := cmp.Diff(newPod, newPodInfo.pod()); diff != "" { + t.Errorf("new pod changed: %v", diff) + } + } + }) + } + } + } +} diff --git a/pkg/apis/core/types.go b/pkg/apis/core/types.go index ec105a41f44..829fd29e59d 100644 --- a/pkg/apis/core/types.go +++ b/pkg/apis/core/types.go @@ -291,7 +291,7 @@ type PersistentVolume struct { // +optional metav1.ObjectMeta - //Spec defines a persistent volume owned by the cluster + // Spec defines a persistent volume owned by the cluster // +optional Spec PersistentVolumeSpec @@ -1977,10 +1977,10 @@ type EnvFromSource struct { // +optional Prefix string // The ConfigMap to select from. - //+optional + // +optional ConfigMapRef *ConfigMapEnvSource // The Secret to select from. - //+optional + // +optional SecretRef *SecretEnvSource } @@ -2428,6 +2428,9 @@ const ( // PodReasonUnschedulable reason in PodScheduled PodCondition means that the scheduler // can't schedule the pod right now, for example due to insufficient resources in the cluster. PodReasonUnschedulable = "Unschedulable" + // PodReasonSchedulingGated reason in PodScheduled PodCondition means that the scheduler + // skips scheduling the pod because one or more scheduling gates are still present. + PodReasonSchedulingGated = "SchedulingGated" // ContainersReady indicates whether all containers in the pod are ready. ContainersReady PodConditionType = "ContainersReady" // AlphaNoCompatGuaranteeDisruptionTarget indicates the pod is about to be deleted due to a @@ -2502,7 +2505,7 @@ const ( // over a set of nodes; that is, it represents the OR of the selectors represented // by the node selector terms. type NodeSelector struct { - //Required. A list of node selector terms. The terms are ORed. + // Required. A list of node selector terms. The terms are ORed. NodeSelectorTerms []NodeSelectorTerm } @@ -2997,6 +3000,12 @@ type PodSpec struct { // - spec.containers[*].securityContext.runAsGroup // +optional OS *PodOS + // SchedulingGates is an opaque list of values that if specified will block scheduling the pod. + // More info: https://git.k8s.io/enhancements/keps/sig-scheduling/3521-pod-scheduling-readiness. + // + // This is an alpha-level feature enabled by PodSchedulingReadiness feature gate. + // +optional + SchedulingGates []PodSchedulingGate } // OSName is the set of OS'es that can be used in OS. @@ -3017,6 +3026,13 @@ type PodOS struct { Name OSName } +// PodSchedulingGate is associated to a Pod to guard its scheduling. +type PodSchedulingGate struct { + // Name of the scheduling gate. + // Each scheduling gate must have a unique name field. + Name string +} + // HostAlias holds the mapping between IP and hostnames that will be injected as an entry in the // pod's hosts file. type HostAlias struct { @@ -3494,7 +3510,7 @@ type ReplicationControllerSpec struct { // insufficient replicas are detected. This reference is ignored if a Template is set. // Must be set before converting to a versioned API object // +optional - //TemplateRef *ObjectReference + // TemplateRef *ObjectReference // Template is the object that describes the pod that will be created if // insufficient replicas are detected. Internally, this takes precedence over a @@ -4830,7 +4846,7 @@ type ObjectReference struct { // LocalObjectReference contains enough information to let you locate the referenced object inside the same namespace. type LocalObjectReference struct { - //TODO: Add other useful fields. apiVersion, kind, uid? + // TODO: Add other useful fields. apiVersion, kind, uid? Name string } diff --git a/pkg/apis/core/validation/validation.go b/pkg/apis/core/validation/validation.go index 53ae4988a8c..8e6d4ecde7f 100644 --- a/pkg/apis/core/validation/validation.go +++ b/pkg/apis/core/validation/validation.go @@ -1013,7 +1013,7 @@ func validateGlusterfsPersistentVolumeSource(glusterfs *core.GlusterfsPersistent func validateFlockerVolumeSource(flocker *core.FlockerVolumeSource, fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} if len(flocker.DatasetName) == 0 && len(flocker.DatasetUUID) == 0 { - //TODO: consider adding a RequiredOneOf() error for this and similar cases + // TODO: consider adding a RequiredOneOf() error for this and similar cases allErrs = append(allErrs, field.Required(fldPath, "one of datasetName and datasetUUID is required")) } if len(flocker.DatasetName) != 0 && len(flocker.DatasetUUID) != 0 { @@ -3272,6 +3272,22 @@ func validateReadinessGates(readinessGates []core.PodReadinessGate, fldPath *fie return allErrs } +func validateSchedulingGates(schedulingGates []core.PodSchedulingGate, fldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + // There should be no duplicates in the list of scheduling gates. + seen := sets.String{} + for i, schedulingGate := range schedulingGates { + if schedulingGate.Name == "" { + allErrs = append(allErrs, field.Required(fldPath.Index(i), "must not be empty")) + } + if seen.Has(schedulingGate.Name) { + allErrs = append(allErrs, field.Duplicate(fldPath.Index(i), schedulingGate.Name)) + } + seen.Insert(schedulingGate.Name) + } + return allErrs +} + func validatePodDNSConfig(dnsConfig *core.PodDNSConfig, dnsPolicy *core.DNSPolicy, fldPath *field.Path, opts PodValidationOptions) field.ErrorList { allErrs := field.ErrorList{} @@ -3420,6 +3436,28 @@ func validateOnlyAddedTolerations(newTolerations []core.Toleration, oldToleratio return allErrs } +func validateOnlyDeletedSchedulingGates(newGates, oldGates []core.PodSchedulingGate, fldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + if len(newGates) == 0 { + return allErrs + } + + additionalGates := make(map[string]int) + for i, newGate := range newGates { + additionalGates[newGate.Name] = i + } + + for _, oldGate := range oldGates { + delete(additionalGates, oldGate.Name) + } + + for gate, i := range additionalGates { + allErrs = append(allErrs, field.Forbidden(fldPath.Index(i).Child("name"), fmt.Sprintf("only deletion is allowed, but found new scheduling gate '%s'", gate))) + } + + return allErrs +} + func ValidateHostAliases(hostAliases []core.HostAlias, fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} for _, hostAlias := range hostAliases { @@ -3577,7 +3615,7 @@ func validatePodIPs(pod *core.Pod) field.ErrorList { } // There should be no duplicates in list of Pod.PodIPs - seen := sets.String{} //:= make(map[string]int) + seen := sets.String{} // := make(map[string]int) for i, podIP := range pod.Status.PodIPs { if seen.Has(podIP.IP) { allErrs = append(allErrs, field.Duplicate(podIPsField.Index(i), podIP)) @@ -3611,6 +3649,7 @@ func ValidatePodSpec(spec *core.PodSpec, podMeta *metav1.ObjectMeta, fldPath *fi allErrs = append(allErrs, validateAffinity(spec.Affinity, fldPath.Child("affinity"))...) allErrs = append(allErrs, validatePodDNSConfig(spec.DNSConfig, &spec.DNSPolicy, fldPath.Child("dnsConfig"), opts)...) allErrs = append(allErrs, validateReadinessGates(spec.ReadinessGates, fldPath.Child("readinessGates"))...) + allErrs = append(allErrs, validateSchedulingGates(spec.SchedulingGates, fldPath.Child("schedulingGates"))...) allErrs = append(allErrs, validateTopologySpreadConstraints(spec.TopologySpreadConstraints, fldPath.Child("topologySpreadConstraints"))...) allErrs = append(allErrs, validateWindowsHostProcessPod(spec, fldPath)...) allErrs = append(allErrs, validateHostUsers(spec, fldPath)...) @@ -4016,7 +4055,7 @@ func validatePodAntiAffinity(podAntiAffinity *core.PodAntiAffinity, fldPath *fie // if podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution != nil { // allErrs = append(allErrs, validatePodAffinityTerms(podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution, false, // fldPath.Child("requiredDuringSchedulingRequiredDuringExecution"))...) - //} + // } if podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { allErrs = append(allErrs, validatePodAffinityTerms(podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution, fldPath.Child("requiredDuringSchedulingIgnoredDuringExecution"))...) @@ -4051,7 +4090,7 @@ func validatePodAffinity(podAffinity *core.PodAffinity, fldPath *field.Path) fie // if podAffinity.RequiredDuringSchedulingRequiredDuringExecution != nil { // allErrs = append(allErrs, validatePodAffinityTerms(podAffinity.RequiredDuringSchedulingRequiredDuringExecution, false, // fldPath.Child("requiredDuringSchedulingRequiredDuringExecution"))...) - //} + // } if podAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { allErrs = append(allErrs, validatePodAffinityTerms(podAffinity.RequiredDuringSchedulingIgnoredDuringExecution, fldPath.Child("requiredDuringSchedulingIgnoredDuringExecution"))...) @@ -4258,7 +4297,7 @@ func ValidatePodSecurityContext(securityContext *core.PodSecurityContext, spec * func ValidateContainerUpdates(newContainers, oldContainers []core.Container, fldPath *field.Path) (allErrs field.ErrorList, stop bool) { allErrs = field.ErrorList{} if len(newContainers) != len(oldContainers) { - //TODO: Pinpoint the specific container that causes the invalid error after we have strategic merge diff + // TODO: Pinpoint the specific container that causes the invalid error after we have strategic merge diff allErrs = append(allErrs, field.Forbidden(fldPath, "pod updates may not add or remove containers")) return allErrs, true } @@ -4285,6 +4324,11 @@ func ValidatePodCreate(pod *core.Pod, opts PodValidationOptions) field.ErrorList if len(pod.Spec.EphemeralContainers) > 0 { allErrs = append(allErrs, field.Forbidden(fldPath.Child("ephemeralContainers"), "cannot be set on create")) } + // A Pod cannot be assigned a Node if there are remaining scheduling gates. + if utilfeature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness) && + pod.Spec.NodeName != "" && len(pod.Spec.SchedulingGates) != 0 { + allErrs = append(allErrs, field.Forbidden(fldPath.Child("nodeName"), "cannot be set until all schedulingGates have been cleared")) + } allErrs = append(allErrs, validateSeccompAnnotationsAndFields(pod.ObjectMeta, &pod.Spec, fldPath)...) return allErrs @@ -4370,6 +4414,7 @@ func ValidatePodUpdate(newPod, oldPod *core.Pod, opts PodValidationOptions) fiel // 2. spec.initContainers[*].image // 3. spec.activeDeadlineSeconds // 4. spec.terminationGracePeriodSeconds + // 5. spec.schedulingGates containerErrs, stop := ValidateContainerUpdates(newPod.Spec.Containers, oldPod.Spec.Containers, specPath.Child("containers")) allErrs = append(allErrs, containerErrs...) @@ -4405,6 +4450,9 @@ func ValidatePodUpdate(newPod, oldPod *core.Pod, opts PodValidationOptions) fiel // Allow only additions to tolerations updates. allErrs = append(allErrs, validateOnlyAddedTolerations(newPod.Spec.Tolerations, oldPod.Spec.Tolerations, specPath.Child("tolerations"))...) + // Allow only deletions to schedulingGates updates. + allErrs = append(allErrs, validateOnlyDeletedSchedulingGates(newPod.Spec.SchedulingGates, oldPod.Spec.SchedulingGates, specPath.Child("schedulingGates"))...) + // the last thing to check is pod spec equality. If the pod specs are equal, then we can simply return the errors we have // so far and save the cost of a deep copy. if apiequality.Semantic.DeepEqual(newPod.Spec, oldPod.Spec) { @@ -4433,6 +4481,8 @@ func ValidatePodUpdate(newPod, oldPod *core.Pod, opts PodValidationOptions) fiel activeDeadlineSeconds := *oldPod.Spec.ActiveDeadlineSeconds mungedPodSpec.ActiveDeadlineSeconds = &activeDeadlineSeconds } + // munge spec.schedulingGates + mungedPodSpec.SchedulingGates = oldPod.Spec.SchedulingGates // +k8s:verify-mutation:reason=clone // tolerations are checked before the deep copy, so munge those too mungedPodSpec.Tolerations = oldPod.Spec.Tolerations // +k8s:verify-mutation:reason=clone @@ -4444,7 +4494,7 @@ func ValidatePodUpdate(newPod, oldPod *core.Pod, opts PodValidationOptions) fiel if !apiequality.Semantic.DeepEqual(mungedPodSpec, oldPod.Spec) { // This diff isn't perfect, but it's a helluva lot better an "I'm not going to tell you what the difference is". - //TODO: Pinpoint the specific field that causes the invalid error after we have strategic merge diff + // TODO: Pinpoint the specific field that causes the invalid error after we have strategic merge diff specDiff := cmp.Diff(oldPod.Spec, mungedPodSpec) allErrs = append(allErrs, field.Forbidden(specPath, fmt.Sprintf("pod updates may not change fields other than `spec.containers[*].image`, `spec.initContainers[*].image`, `spec.activeDeadlineSeconds`, `spec.tolerations` (only additions to existing tolerations) or `spec.terminationGracePeriodSeconds` (allow it to be set to 1 if it was previously negative)\n%v", specDiff))) } @@ -6121,7 +6171,7 @@ func validateEndpointSubsets(subsets []core.EndpointSubset, fldPath *field.Path) // EndpointSubsets must include endpoint address. For headless service, we allow its endpoints not to have ports. if len(ss.Addresses) == 0 && len(ss.NotReadyAddresses) == 0 { - //TODO: consider adding a RequiredOneOf() error for this and similar cases + // TODO: consider adding a RequiredOneOf() error for this and similar cases allErrs = append(allErrs, field.Required(idxPath, "must specify `addresses` or `notReadyAddresses`")) } for addr := range ss.Addresses { @@ -6210,7 +6260,7 @@ func validateEndpointPort(port *core.EndpointPort, requireName bool, fldPath *fi // ValidateSecurityContext ensures the security context contains valid settings func ValidateSecurityContext(sc *core.SecurityContext, fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} - //this should only be true for testing since SecurityContext is defaulted by the core + // this should only be true for testing since SecurityContext is defaulted by the core if sc == nil { return allErrs } @@ -6726,7 +6776,7 @@ func ValidateServiceClusterIPsRelatedFields(service *core.Service) field.ErrorLi } // IPFamilyPolicy stand alone validation - //note: nil is ok, defaulted in alloc check registry/core/service/* + // note: nil is ok, defaulted in alloc check registry/core/service/* if service.Spec.IPFamilyPolicy != nil { // must have a supported value if !supportedServiceIPFamilyPolicy.Has(string(*(service.Spec.IPFamilyPolicy))) { diff --git a/pkg/apis/core/validation/validation_test.go b/pkg/apis/core/validation/validation_test.go index 29c985a216a..9409c245eea 100644 --- a/pkg/apis/core/validation/validation_test.go +++ b/pkg/apis/core/validation/validation_test.go @@ -703,7 +703,7 @@ func TestValidatePersistentVolumeSourceUpdate(t *testing.T) { Namespace: "default", } - //longSecretRef refers to the secretRefs which are validated with IsDNS1123Subdomain + // longSecretRef refers to the secretRefs which are validated with IsDNS1123Subdomain longSecretName := "key-name.example.com" longSecretRef := &core.SecretReference{ Name: longSecretName, @@ -10794,6 +10794,91 @@ func TestValidatePod(t *testing.T) { } } +func TestValidatePodCreateWithSchedulingGates(t *testing.T) { + applyEssentials := func(pod *core.Pod) { + pod.Spec.Containers = []core.Container{ + {Name: "con", Image: "pause", ImagePullPolicy: "IfNotPresent", TerminationMessagePolicy: "File"}, + } + pod.Spec.RestartPolicy = core.RestartPolicyAlways + pod.Spec.DNSPolicy = core.DNSClusterFirst + } + fldPath := field.NewPath("spec") + + tests := []struct { + name string + pod *core.Pod + featureEnabled bool + wantFieldErrors field.ErrorList + }{ + { + name: "create a Pod with nodeName and schedulingGates, feature disabled", + pod: &core.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod", Namespace: "ns"}, + Spec: core.PodSpec{ + NodeName: "node", + SchedulingGates: []core.PodSchedulingGate{ + {Name: "foo"}, + }, + }, + }, + featureEnabled: false, + wantFieldErrors: nil, + }, + { + name: "create a Pod with nodeName and schedulingGates, feature enabled", + pod: &core.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod", Namespace: "ns"}, + Spec: core.PodSpec{ + NodeName: "node", + SchedulingGates: []core.PodSchedulingGate{ + {Name: "foo"}, + }, + }, + }, + featureEnabled: true, + wantFieldErrors: []*field.Error{field.Forbidden(fldPath.Child("nodeName"), "cannot be set until all schedulingGates have been cleared")}, + }, + { + name: "create a Pod with schedulingGates, feature disabled", + pod: &core.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod", Namespace: "ns"}, + Spec: core.PodSpec{ + SchedulingGates: []core.PodSchedulingGate{ + {Name: "foo"}, + }, + }, + }, + featureEnabled: false, + wantFieldErrors: nil, + }, + { + name: "create a Pod with schedulingGates, feature enabled", + pod: &core.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod", Namespace: "ns"}, + Spec: core.PodSpec{ + SchedulingGates: []core.PodSchedulingGate{ + {Name: "foo"}, + }, + }, + }, + featureEnabled: true, + wantFieldErrors: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodSchedulingReadiness, tt.featureEnabled)() + + applyEssentials(tt.pod) + errs := ValidatePodCreate(tt.pod, PodValidationOptions{}) + if diff := cmp.Diff(tt.wantFieldErrors, errs); diff != "" { + t.Errorf("unexpected field errors (-want, +got):\n%s", diff) + } + }) + } +} + func TestValidatePodUpdate(t *testing.T) { var ( activeDeadlineSecondsZero = int64(0) @@ -11698,6 +11783,54 @@ func TestValidatePodUpdate(t *testing.T) { err: "Forbidden: pod updates may not change fields other than ", test: "update pod spec OS to a valid value, featuregate disabled", }, + { + new: core.Pod{ + Spec: core.PodSpec{ + SchedulingGates: []core.PodSchedulingGate{{Name: "foo"}}, + }, + }, + old: core.Pod{}, + err: "Forbidden: only deletion is allowed, but found new scheduling gate 'foo'", + test: "update pod spec schedulingGates: add new scheduling gate", + }, + { + new: core.Pod{ + Spec: core.PodSpec{ + SchedulingGates: []core.PodSchedulingGate{{Name: "bar"}}, + }, + }, + old: core.Pod{ + Spec: core.PodSpec{ + SchedulingGates: []core.PodSchedulingGate{{Name: "foo"}}, + }, + }, + err: "Forbidden: only deletion is allowed, but found new scheduling gate 'bar'", + test: "update pod spec schedulingGates: mutating an existing scheduling gate", + }, + { + new: core.Pod{ + Spec: core.PodSpec{ + SchedulingGates: []core.PodSchedulingGate{{Name: "baz"}}, + }, + }, + old: core.Pod{ + Spec: core.PodSpec{ + SchedulingGates: []core.PodSchedulingGate{{Name: "foo"}, {Name: "bar"}}, + }, + }, + err: "Forbidden: only deletion is allowed, but found new scheduling gate 'baz'", + test: "update pod spec schedulingGates: mutating an existing scheduling gate along with deletion", + }, + { + new: core.Pod{}, + old: core.Pod{ + Spec: core.PodSpec{ + SchedulingGates: []core.PodSchedulingGate{{Name: "foo"}}, + }, + }, + err: "", + test: "update pod spec schedulingGates: legal deletion", + }, } for _, test := range tests { test.new.ObjectMeta.ResourceVersion = "1" @@ -18484,6 +18617,7 @@ func TestValidateOSFields(t *testing.T) { "RestartPolicy", "RuntimeClassName", "SchedulerName", + "SchedulingGates[*].Name", "SecurityContext.RunAsNonRoot", "ServiceAccountName", "SetHostnameAsFQDN", @@ -18520,6 +18654,71 @@ func TestValidateOSFields(t *testing.T) { } } +func TestValidateSchedulingGates(t *testing.T) { + fieldPath := field.NewPath("field") + + tests := []struct { + name string + schedulingGates []core.PodSchedulingGate + wantFieldErrors field.ErrorList + }{ + { + name: "nil gates", + schedulingGates: nil, + wantFieldErrors: field.ErrorList{}, + }, + { + name: "empty string in gates", + schedulingGates: []core.PodSchedulingGate{ + {Name: "foo"}, + {Name: ""}, + }, + wantFieldErrors: []*field.Error{field.Required(fieldPath.Index(1), "must not be empty")}, + }, + { + name: "legal gates", + schedulingGates: []core.PodSchedulingGate{ + {Name: "foo"}, + {Name: "bar"}, + }, + wantFieldErrors: field.ErrorList{}, + }, + { + name: "duplicated gates (single duplication)", + schedulingGates: []core.PodSchedulingGate{ + {Name: "foo"}, + {Name: "bar"}, + {Name: "bar"}, + }, + wantFieldErrors: []*field.Error{field.Duplicate(fieldPath.Index(2), "bar")}, + }, + { + name: "duplicated gates (multiple duplications)", + schedulingGates: []core.PodSchedulingGate{ + {Name: "foo"}, + {Name: "bar"}, + {Name: "foo"}, + {Name: "baz"}, + {Name: "foo"}, + {Name: "bar"}, + }, + wantFieldErrors: field.ErrorList{ + field.Duplicate(fieldPath.Index(2), "foo"), + field.Duplicate(fieldPath.Index(4), "foo"), + field.Duplicate(fieldPath.Index(5), "bar"), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + errs := validateSchedulingGates(tt.schedulingGates, fieldPath) + if diff := cmp.Diff(tt.wantFieldErrors, errs); diff != "" { + t.Errorf("unexpected field errors (-want, +got):\n%s", diff) + } + }) + } +} + // collectResourcePaths traverses the object, computing all the struct paths. func collectResourcePaths(t *testing.T, skipRecurseList sets.String, tp reflect.Type, path *field.Path) sets.String { if pathStr := path.String(); len(pathStr) > 0 && skipRecurseList.Has(pathStr) { @@ -18664,7 +18863,7 @@ func TestValidateSecurityContext(t *testing.T) { } } - //setup data + // setup data allSettings := fullValidSC() noCaps := fullValidSC() noCaps.Capabilities = nil diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 947b481d7d4..3c165674fef 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -643,6 +643,13 @@ const ( // sandbox creation and network configuration completes successfully PodHasNetworkCondition featuregate.Feature = "PodHasNetworkCondition" + // owner: @Huang-Wei + // kep: https://kep.k8s.io/3521 + // alpha: v1.26 + // + // Enable users to specify when a Pod is ready for scheduling. + PodSchedulingReadiness featuregate.Feature = "PodSchedulingReadiness" + // owner: @liggitt, @tallclair, sig-auth // alpha: v1.22 // beta: v1.23 @@ -995,6 +1002,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS PodHasNetworkCondition: {Default: false, PreRelease: featuregate.Alpha}, + PodSchedulingReadiness: {Default: false, PreRelease: featuregate.Alpha}, + PodSecurity: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, ProbeTerminationGracePeriod: {Default: true, PreRelease: featuregate.Beta}, // Default to true in beta 1.25 diff --git a/pkg/printers/internalversion/printers.go b/pkg/printers/internalversion/printers.go index 2f917745eb9..4238e9e9050 100644 --- a/pkg/printers/internalversion/printers.go +++ b/pkg/printers/internalversion/printers.go @@ -767,6 +767,13 @@ func printPod(pod *api.Pod, options printers.GenerateOptions) ([]metav1.TableRow reason = pod.Status.Reason } + // If the Pod carries {type:PodScheduled, reason:WaitingForGates}, set reason to 'SchedulingGated'. + for _, condition := range pod.Status.Conditions { + if condition.Type == api.PodScheduled && condition.Reason == api.PodReasonSchedulingGated { + reason = api.PodReasonSchedulingGated + } + } + row := metav1.TableRow{ Object: runtime.RawExtension{Object: pod}, } diff --git a/pkg/printers/internalversion/printers_test.go b/pkg/printers/internalversion/printers_test.go index b9ee04bcc4c..2a9c9ef9c6c 100644 --- a/pkg/printers/internalversion/printers_test.go +++ b/pkg/printers/internalversion/printers_test.go @@ -1502,6 +1502,24 @@ func TestPrintPod(t *testing.T) { }, []metav1.TableRow{{Cells: []interface{}{"test14", "2/2", "Running", "9 (5d ago)", ""}}}, }, + { + // Test PodScheduled condition with reason WaitingForGates + api.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test15"}, + Spec: api.PodSpec{Containers: make([]api.Container, 2)}, + Status: api.PodStatus{ + Phase: "podPhase", + Conditions: []api.PodCondition{ + { + Type: api.PodScheduled, + Status: api.ConditionFalse, + Reason: api.PodReasonSchedulingGated, + }, + }, + }, + }, + []metav1.TableRow{{Cells: []interface{}{"test15", "0/2", api.PodReasonSchedulingGated, "0", ""}}}, + }, } for i, test := range tests { diff --git a/pkg/registry/core/pod/storage/storage.go b/pkg/registry/core/pod/storage/storage.go index db21b59da4d..3960918de62 100644 --- a/pkg/registry/core/pod/storage/storage.go +++ b/pkg/registry/core/pod/storage/storage.go @@ -33,10 +33,12 @@ import ( "k8s.io/apiserver/pkg/storage" storeerr "k8s.io/apiserver/pkg/storage/errors" "k8s.io/apiserver/pkg/util/dryrun" + utilfeature "k8s.io/apiserver/pkg/util/feature" policyclient "k8s.io/client-go/kubernetes/typed/policy/v1" podutil "k8s.io/kubernetes/pkg/api/pod" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/core/validation" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/printers" printersinternal "k8s.io/kubernetes/pkg/printers/internalversion" @@ -221,6 +223,10 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types if pod.Spec.NodeName != "" { return nil, fmt.Errorf("pod %v is already assigned to node %q", pod.Name, pod.Spec.NodeName) } + // Reject binding to a scheduling un-ready Pod. + if utilfeature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness) && len(pod.Spec.SchedulingGates) != 0 { + return nil, fmt.Errorf("pod %v has non-empty .spec.schedulingGates", pod.Name) + } pod.Spec.NodeName = machine if pod.Annotations == nil { pod.Annotations = make(map[string]string) diff --git a/pkg/registry/core/pod/storage/storage_test.go b/pkg/registry/core/pod/storage/storage_test.go index 4f671e2c48b..1e15ca8a834 100644 --- a/pkg/registry/core/pod/storage/storage_test.go +++ b/pkg/registry/core/pod/storage/storage_test.go @@ -18,6 +18,7 @@ package storage import ( "context" + goerrors "errors" "fmt" "net/url" "strings" @@ -41,7 +42,10 @@ import ( apiserverstorage "k8s.io/apiserver/pkg/storage" storeerr "k8s.io/apiserver/pkg/storage/errors" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/securitycontext" ) @@ -745,6 +749,77 @@ func TestEtcdCreateWithConflict(t *testing.T) { } } +func TestEtcdCreateWithSchedulingGates(t *testing.T) { + tests := []struct { + name string + featureEnabled bool + schedulingGates []api.PodSchedulingGate + wantErr error + }{ + { + name: "pod with non-nil schedulingGates, feature disabled", + featureEnabled: false, + schedulingGates: []api.PodSchedulingGate{ + {Name: "foo"}, + {Name: "bar"}, + }, + wantErr: nil, + }, + { + name: "pod with non-nil schedulingGates, feature enabled", + featureEnabled: true, + schedulingGates: []api.PodSchedulingGate{ + {Name: "foo"}, + {Name: "bar"}, + }, + wantErr: goerrors.New(`Operation cannot be fulfilled on pods/binding "foo": pod foo has non-empty .spec.schedulingGates`), + }, + { + name: "pod with nil schedulingGates, feature disabled", + featureEnabled: false, + schedulingGates: nil, + wantErr: nil, + }, + { + name: "pod with nil schedulingGates, feature enabled", + featureEnabled: true, + schedulingGates: nil, + wantErr: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodSchedulingReadiness, tt.featureEnabled)() + storage, bindingStorage, _, server := newStorage(t) + defer server.Terminate(t) + defer storage.Store.DestroyFunc() + ctx := genericapirequest.NewDefaultContext() + + pod := validNewPod() + pod.Spec.SchedulingGates = tt.schedulingGates + if _, err := storage.Create(ctx, pod, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + _, err := bindingStorage.Create(ctx, "foo", &api.Binding{ + ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceDefault, Name: "foo"}, + Target: api.ObjectReference{Name: "machine"}, + }, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) + if tt.wantErr == nil { + if err != nil { + t.Errorf("Want nil err, but got %v", err) + } + } else { + if err == nil { + t.Errorf("Want %v, but got nil err", tt.wantErr) + } else if tt.wantErr.Error() != err.Error() { + t.Errorf("Want %v, but got %v", tt.wantErr, err) + } + } + }) + } +} + func validNewBinding() *api.Binding { return &api.Binding{ ObjectMeta: metav1.ObjectMeta{Name: "foo"}, diff --git a/pkg/registry/core/pod/strategy.go b/pkg/registry/core/pod/strategy.go index 5a50105cadc..4021b4eb4fb 100644 --- a/pkg/registry/core/pod/strategy.go +++ b/pkg/registry/core/pod/strategy.go @@ -38,12 +38,14 @@ import ( "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/names" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api/legacyscheme" podutil "k8s.io/kubernetes/pkg/api/pod" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/core/helper/qos" "k8s.io/kubernetes/pkg/apis/core/validation" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/client" proxyutil "k8s.io/kubernetes/pkg/proxy/util" "sigs.k8s.io/structured-merge-diff/v4/fieldpath" @@ -87,6 +89,7 @@ func (podStrategy) PrepareForCreate(ctx context.Context, obj runtime.Object) { podutil.DropDisabledPodFields(pod, nil) applySeccompVersionSkew(pod) + applyWaitingForSchedulingGatesCondition(pod) } // PrepareForUpdate clears fields that are not allowed to be set by end users on update. @@ -642,6 +645,29 @@ func validateContainer(container string, pod *api.Pod) (string, error) { return container, nil } +// applyWaitingForSchedulingGatesCondition adds a {type:PodScheduled, reason:WaitingForGates} condition +// to a new-created Pod if necessary. +func applyWaitingForSchedulingGatesCondition(pod *api.Pod) { + if !utilfeature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness) || + len(pod.Spec.SchedulingGates) == 0 { + return + } + + // If found a condition with type PodScheduled, return. + for _, condition := range pod.Status.Conditions { + if condition.Type == api.PodScheduled { + return + } + } + + pod.Status.Conditions = append(pod.Status.Conditions, api.PodCondition{ + Type: api.PodScheduled, + Status: api.ConditionFalse, + Reason: api.PodReasonSchedulingGated, + Message: "Scheduling is blocked due to non-empty scheduling gates", + }) +} + // applySeccompVersionSkew implements the version skew behavior described in: // https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/135-seccomp#version-skew-strategy // Note that we dropped copying the field to annotation synchronization in diff --git a/pkg/registry/core/pod/strategy_test.go b/pkg/registry/core/pod/strategy_test.go index a6e4a530eb7..e8896c77216 100644 --- a/pkg/registry/core/pod/strategy_test.go +++ b/pkg/registry/core/pod/strategy_test.go @@ -35,9 +35,12 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" + featuregatetesting "k8s.io/component-base/featuregate/testing" apitesting "k8s.io/kubernetes/pkg/api/testing" api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/client" utilpointer "k8s.io/utils/pointer" @@ -261,6 +264,72 @@ func TestGetPodQOS(t *testing.T) { } } +func TestWaitingForGatesCondition(t *testing.T) { + tests := []struct { + name string + pod *api.Pod + featureEnabled bool + want api.PodCondition + }{ + { + name: "pod without .spec.schedulingGates, feature disabled", + pod: &api.Pod{}, + featureEnabled: false, + want: api.PodCondition{}, + }, + { + name: "pod without .spec.schedulingGates, feature enabled", + pod: &api.Pod{}, + featureEnabled: true, + want: api.PodCondition{}, + }, + { + name: "pod with .spec.schedulingGates, feature disabled", + pod: &api.Pod{ + Spec: api.PodSpec{ + SchedulingGates: []api.PodSchedulingGate{{Name: "foo"}}, + }, + }, + featureEnabled: false, + want: api.PodCondition{}, + }, + { + name: "pod with .spec.schedulingGates, feature enabled", + pod: &api.Pod{ + Spec: api.PodSpec{ + SchedulingGates: []api.PodSchedulingGate{{Name: "foo"}}, + }, + }, + featureEnabled: true, + want: api.PodCondition{ + Type: api.PodScheduled, + Status: api.ConditionFalse, + Reason: api.PodReasonSchedulingGated, + Message: "Scheduling is blocked due to non-empty scheduling gates", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodSchedulingReadiness, tt.featureEnabled)() + + Strategy.PrepareForCreate(genericapirequest.NewContext(), tt.pod) + var got api.PodCondition + for _, condition := range tt.pod.Status.Conditions { + if condition.Type == api.PodScheduled { + got = condition + break + } + } + + if diff := cmp.Diff(tt.want, got); diff != "" { + t.Errorf("unexpected field errors (-want, +got):\n%s", diff) + } + }) + } +} + func TestCheckGracefulDelete(t *testing.T) { defaultGracePeriod := int64(30) tcs := []struct { diff --git a/staging/src/k8s.io/api/core/v1/types.go b/staging/src/k8s.io/api/core/v1/types.go index adfad114e4a..7b03410409a 100644 --- a/staging/src/k8s.io/api/core/v1/types.go +++ b/staging/src/k8s.io/api/core/v1/types.go @@ -1663,7 +1663,7 @@ type ServiceAccountTokenProjection struct { // must identify itself with an identifier specified in the audience of the // token, and otherwise should reject the token. The audience defaults to the // identifier of the apiserver. - //+optional + // +optional Audience string `json:"audience,omitempty" protobuf:"bytes,1,rep,name=audience"` // expirationSeconds is the requested duration of validity of the service // account token. As the token approaches expiration, the kubelet volume @@ -1671,7 +1671,7 @@ type ServiceAccountTokenProjection struct { // start trying to rotate the token if the token is older than 80 percent of // its time to live or if the token is older than 24 hours.Defaults to 1 hour // and must be at least 10 minutes. - //+optional + // +optional ExpirationSeconds *int64 `json:"expirationSeconds,omitempty" protobuf:"varint,2,opt,name=expirationSeconds"` // path is the path relative to the mount point of the file to project the // token into. @@ -2666,6 +2666,10 @@ const ( // can't schedule the pod right now, for example due to insufficient resources in the cluster. PodReasonUnschedulable = "Unschedulable" + // PodReasonSchedulingGated reason in PodScheduled PodCondition means that the scheduler + // skips scheduling the pod because one or more scheduling gates are still present. + PodReasonSchedulingGated = "SchedulingGated" + // PodReasonSchedulerError reason in PodScheduled PodCondition means that some internal error happens // during scheduling, for example due to nodeAffinity parsing errors. PodReasonSchedulerError = "SchedulerError" @@ -2743,7 +2747,7 @@ const ( // by the node selector terms. // +structType=atomic type NodeSelector struct { - //Required. A list of node selector terms. The terms are ORed. + // Required. A list of node selector terms. The terms are ORed. NodeSelectorTerms []NodeSelectorTerm `json:"nodeSelectorTerms" protobuf:"bytes,1,rep,name=nodeSelectorTerms"` } @@ -3327,6 +3331,16 @@ type PodSpec struct { // +k8s:conversion-gen=false // +optional HostUsers *bool `json:"hostUsers,omitempty" protobuf:"bytes,37,opt,name=hostUsers"` + // SchedulingGates is an opaque list of values that if specified will block scheduling the pod. + // More info: https://git.k8s.io/enhancements/keps/sig-scheduling/3521-pod-scheduling-readiness. + // + // This is an alpha-level feature enabled by PodSchedulingReadiness feature gate. + // +optional + // +patchMergeKey=name + // +patchStrategy=merge + // +listType=map + // +listMapKey=name + SchedulingGates []PodSchedulingGate `json:"schedulingGates,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,38,opt,name=schedulingGates"` } // OSName is the set of OS'es that can be used in OS. @@ -3347,6 +3361,13 @@ type PodOS struct { Name OSName `json:"name" protobuf:"bytes,1,opt,name=name"` } +// PodSchedulingGate is associated to a Pod to guard its scheduling. +type PodSchedulingGate struct { + // Name of the scheduling gate. + // Each scheduling gate must have a unique name field. + Name string `json:"name" protobuf:"bytes,1,opt,name=name"` +} + // +enum type UnsatisfiableConstraintAction string @@ -4526,7 +4547,7 @@ type ServiceSpec struct { SessionAffinityConfig *SessionAffinityConfig `json:"sessionAffinityConfig,omitempty" protobuf:"bytes,14,opt,name=sessionAffinityConfig"` // TopologyKeys is tombstoned to show why 16 is reserved protobuf tag. - //TopologyKeys []string `json:"topologyKeys,omitempty" protobuf:"bytes,16,opt,name=topologyKeys"` + // TopologyKeys []string `json:"topologyKeys,omitempty" protobuf:"bytes,16,opt,name=topologyKeys"` // IPFamily is tombstoned to show why 15 is a reserved protobuf tag. // IPFamily *IPFamily `json:"ipFamily,omitempty" protobuf:"bytes,15,opt,name=ipFamily,Configcasttype=IPFamily"` diff --git a/test/e2e/framework/pod/wait_test.go b/test/e2e/framework/pod/wait_test.go index 5939fba6ab5..3cae475458d 100644 --- a/test/e2e/framework/pod/wait_test.go +++ b/test/e2e/framework/pod/wait_test.go @@ -184,6 +184,7 @@ INFO: Unexpected error: wait for pod pending-pod running: SetHostnameAsFQDN: nil, OS: nil, HostUsers: nil, + SchedulingGates: nil, }, Status: { Phase: "",