Merge pull request #59281 from bsalamat/nominated_node

Automatic merge from submit-queue (batch tested with PRs 59010, 59212, 59281, 59014, 59297). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Replace nominateNodeName annotation with PodStatus.NominatedNodeName

**What this PR does / why we need it**:
Replaces nominateNodeName annotation with PodStatus.NominatedNodeName in scheudler's logic. We don't expect any logic/behavior changes.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```

ref #57471

/sig scheduling
cc: @k82cn @aveshagarwal @resouer
This commit is contained in:
Kubernetes Submit Queue 2018-02-07 15:27:43 -08:00 committed by GitHub
commit 5a4b160cf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 48 additions and 82 deletions

View File

@ -61,11 +61,6 @@ var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
const (
NoNodeAvailableMsg = "0/%v nodes are available"
// NominatedNodeAnnotationKey is used to annotate a pod that has preempted other pods.
// The scheduler uses the annotation to find that the pod shouldn't preempt more pods
// when it gets to the head of scheduling queue again.
// See podEligibleToPreemptOthers() for more information.
NominatedNodeAnnotationKey = "scheduler.kubernetes.io/nominated-node-name"
)
// Error returns detailed information of why the pod failed to fit on each node
@ -1015,8 +1010,9 @@ func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicat
// We look at the node that is nominated for this pod and as long as there are
// terminating pods on the node, we don't consider this for preempting more pods.
func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool {
if nodeName, found := pod.Annotations[NominatedNodeAnnotationKey]; found {
if nodeInfo, found := nodeNameToInfo[nodeName]; found {
nomNodeName := pod.Status.NominatedNodeName
if len(nomNodeName) > 0 {
if nodeInfo, found := nodeNameToInfo[nomNodeName]; found {
for _, p := range nodeInfo.Pods() {
if p.DeletionTimestamp != nil && util.GetPodPriority(p) < util.GetPodPriority(pod) {
// There is a terminating pod on the nominated node.

View File

@ -1318,8 +1318,7 @@ func TestPreempt(t *testing.T) {
// Mark the victims for deletion and record the preemptor's nominated node name.
now := metav1.Now()
victim.DeletionTimestamp = &now
test.pod.Annotations = make(map[string]string)
test.pod.Annotations[NominatedNodeAnnotationKey] = node.Name
test.pod.Status.NominatedNodeName = node.Name
}
// Call preempt again and make sure it doesn't preempt any more pods.
node, victims, _, err = scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))

View File

@ -397,10 +397,8 @@ func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod {
pods := p.unschedulableQ.GetPodsWaitingForNode(nodeName)
for _, obj := range p.activeQ.List() {
pod := obj.(*v1.Pod)
if pod.Annotations != nil {
if n, ok := pod.Annotations[NominatedNodeAnnotationKey]; ok && n == nodeName {
pods = append(pods, pod)
}
if pod.Status.NominatedNodeName == nodeName {
pods = append(pods, pod)
}
}
return pods
@ -420,11 +418,7 @@ type UnschedulablePodsMap struct {
var _ = UnschedulablePods(&UnschedulablePodsMap{})
func NominatedNodeName(pod *v1.Pod) string {
nominatedNodeName, ok := pod.Annotations[NominatedNodeAnnotationKey]
if !ok {
return ""
}
return nominatedNodeName
return pod.Status.NominatedNodeName
}
// Add adds a pod to the unschedulable pods.

View File

@ -41,19 +41,22 @@ var highPriorityPod, medPriorityPod, unschedulablePod = v1.Pod{
Name: "mpp",
Namespace: "ns2",
Annotations: map[string]string{
NominatedNodeAnnotationKey: "node1", "annot2": "val2",
"annot2": "val2",
},
},
Spec: v1.PodSpec{
Priority: &mediumPriority,
},
Status: v1.PodStatus{
NominatedNodeName: "node1",
},
},
v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "up",
Namespace: "ns1",
Annotations: map[string]string{
NominatedNodeAnnotationKey: "node1", "annot2": "val2",
"annot2": "val2",
},
},
Spec: v1.PodSpec{
@ -67,6 +70,7 @@ var highPriorityPod, medPriorityPod, unschedulablePod = v1.Pod{
Reason: v1.PodReasonUnschedulable,
},
},
NominatedNodeName: "node1",
},
}
@ -217,9 +221,12 @@ func TestUnschedulablePodsMap(t *testing.T) {
Name: "p0",
Namespace: "ns1",
Annotations: map[string]string{
NominatedNodeAnnotationKey: "node1", "annot2": "val2",
"annot1": "val1",
},
},
Status: v1.PodStatus{
NominatedNodeName: "node1",
},
},
{
ObjectMeta: metav1.ObjectMeta{
@ -235,27 +242,30 @@ func TestUnschedulablePodsMap(t *testing.T) {
Name: "p2",
Namespace: "ns2",
Annotations: map[string]string{
NominatedNodeAnnotationKey: "node3", "annot2": "val2", "annot3": "val3",
"annot2": "val2", "annot3": "val3",
},
},
Status: v1.PodStatus{
NominatedNodeName: "node3",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "p3",
Namespace: "ns4",
Annotations: map[string]string{
NominatedNodeAnnotationKey: "node1",
},
},
Status: v1.PodStatus{
NominatedNodeName: "node1",
},
},
}
var updatedPods = make([]*v1.Pod, len(pods))
updatedPods[0] = pods[0].DeepCopy()
updatedPods[0].Annotations[NominatedNodeAnnotationKey] = "node3"
updatedPods[0].Status.NominatedNodeName = "node3"
updatedPods[1] = pods[1].DeepCopy()
updatedPods[1].Annotations[NominatedNodeAnnotationKey] = "node3"
updatedPods[1].Status.NominatedNodeName = "node3"
updatedPods[3] = pods[3].DeepCopy()
delete(updatedPods[3].Annotations, NominatedNodeAnnotationKey)
updatedPods[3].Status.NominatedNodeName = ""
tests := []struct {
podsToAdd []*v1.Pod

View File

@ -33,7 +33,6 @@ go_library(
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",

View File

@ -19,7 +19,6 @@ limitations under the License.
package factory
import (
"encoding/json"
"fmt"
"reflect"
"time"
@ -30,7 +29,6 @@ import (
"k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -1331,41 +1329,16 @@ func (p *podPreemptor) DeletePod(pod *v1.Pod) error {
return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
}
func (p *podPreemptor) UpdatePodAnnotations(pod *v1.Pod, annotations map[string]string) error {
func (p *podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
podCopy := pod.DeepCopy()
if podCopy.Annotations == nil {
podCopy.Annotations = map[string]string{}
}
for k, v := range annotations {
podCopy.Annotations[k] = v
}
ret := &unstructured.Unstructured{}
ret.SetAnnotations(podCopy.Annotations)
patchData, err := json.Marshal(ret)
if err != nil {
return err
}
_, error := p.Client.CoreV1().Pods(podCopy.Namespace).Patch(podCopy.Name, types.MergePatchType, patchData, "status")
return error
podCopy.Status.NominatedNodeName = nominatedNodeName
_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy)
return err
}
func (p *podPreemptor) RemoveNominatedNodeAnnotation(pod *v1.Pod) error {
podCopy := pod.DeepCopy()
if podCopy.Annotations == nil {
func (p *podPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
if len(pod.Status.NominatedNodeName) == 0 {
return nil
}
if _, exists := podCopy.Annotations[core.NominatedNodeAnnotationKey]; !exists {
return nil
}
// Note: Deleting the entry from the annotations and passing it to Patch() will
// not remove the annotation. That's why we set it to empty string.
podCopy.Annotations[core.NominatedNodeAnnotationKey] = ""
ret := &unstructured.Unstructured{}
ret.SetAnnotations(podCopy.Annotations)
patchData, err := json.Marshal(ret)
if err != nil {
return err
}
_, error := p.Client.CoreV1().Pods(podCopy.Namespace).Patch(podCopy.Name, types.MergePatchType, patchData, "status")
return error
return p.SetNominatedNodeName(pod, "")
}

View File

@ -57,8 +57,8 @@ type PodConditionUpdater interface {
type PodPreemptor interface {
GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
DeletePod(pod *v1.Pod) error
UpdatePodAnnotations(pod *v1.Pod, annots map[string]string) error
RemoveNominatedNodeAnnotation(pod *v1.Pod) error
SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error
RemoveNominatedNodeName(pod *v1.Pod) error
}
// Scheduler watches for new unscheduled pods. It attempts to find
@ -226,8 +226,7 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e
var nodeName = ""
if node != nil {
nodeName = node.Name
annotations := map[string]string{core.NominatedNodeAnnotationKey: nodeName}
err = sched.config.PodPreemptor.UpdatePodAnnotations(preemptor, annotations)
err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
if err != nil {
glog.Errorf("Error in preemption process. Cannot update pod %v annotations: %v", preemptor.Name, err)
return "", err
@ -245,7 +244,7 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e
// but preemption logic does not find any node for it. In that case Preempt()
// function of generic_scheduler.go returns the pod itself for removal of the annotation.
for _, p := range nominatedPodsToClear {
rErr := sched.config.PodPreemptor.RemoveNominatedNodeAnnotation(p)
rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
if rErr != nil {
glog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
// We do not return as this error is not critical.

View File

@ -65,11 +65,11 @@ func (fp fakePodPreemptor) DeletePod(pod *v1.Pod) error {
return nil
}
func (fp fakePodPreemptor) UpdatePodAnnotations(pod *v1.Pod, annots map[string]string) error {
func (fp fakePodPreemptor) SetNominatedNodeName(pod *v1.Pod, nomNodeName string) error {
return nil
}
func (fp fakePodPreemptor) RemoveNominatedNodeAnnotation(pod *v1.Pod) error {
func (fp fakePodPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
return nil
}

View File

@ -39,7 +39,6 @@ go_test(
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/schedulercache:go_default_library",
"//plugin/pkg/admission/podtolerationrestriction:go_default_library",

View File

@ -34,7 +34,6 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/features"
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
"k8s.io/kubernetes/pkg/scheduler/core"
testutils "k8s.io/kubernetes/test/utils"
"github.com/golang/glog"
@ -42,14 +41,13 @@ import (
var lowPriority, mediumPriority, highPriority = int32(100), int32(200), int32(300)
func waitForNominatedNodeAnnotation(cs clientset.Interface, pod *v1.Pod) error {
func waitForNominatedNodeName(cs clientset.Interface, pod *v1.Pod) error {
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
pod, err := cs.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
annot, found := pod.Annotations[core.NominatedNodeAnnotationKey]
if found && len(annot) > 0 {
if len(pod.Status.NominatedNodeName) > 0 {
return true, nil
}
return false, err
@ -276,7 +274,7 @@ func TestPreemption(t *testing.T) {
}
// Also check that the preemptor pod gets the annotation for nominated node name.
if len(test.preemptedPodIndexes) > 0 {
if err := waitForNominatedNodeAnnotation(cs, preemptor); err != nil {
if err := waitForNominatedNodeName(cs, preemptor); err != nil {
t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v: %v", test.description, preemptor.Name, err)
}
}
@ -389,7 +387,7 @@ func TestPreemptionStarvation(t *testing.T) {
t.Errorf("Error while creating the preempting pod: %v", err)
}
// Check that the preemptor pod gets the annotation for nominated node name.
if err := waitForNominatedNodeAnnotation(cs, preemptor); err != nil {
if err := waitForNominatedNodeName(cs, preemptor); err != nil {
t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v: %v", test.description, preemptor.Name, err)
}
// Make sure that preemptor is scheduled after preemptions.
@ -462,7 +460,7 @@ func TestNominatedNodeCleanUp(t *testing.T) {
t.Errorf("Error while creating the medium priority pod: %v", err)
}
// Step 3. Check that nominated node name of the medium priority pod is set.
if err := waitForNominatedNodeAnnotation(cs, medPriPod); err != nil {
if err := waitForNominatedNodeName(cs, medPriPod); err != nil {
t.Errorf("NominatedNodeName annotation was not set for pod %v: %v", medPriPod.Name, err)
}
// Step 4. Create a high priority pod.
@ -480,7 +478,7 @@ func TestNominatedNodeCleanUp(t *testing.T) {
t.Errorf("Error while creating the high priority pod: %v", err)
}
// Step 5. Check that nominated node name of the high priority pod is set.
if err := waitForNominatedNodeAnnotation(cs, highPriPod); err != nil {
if err := waitForNominatedNodeName(cs, highPriPod); err != nil {
t.Errorf("NominatedNodeName annotation was not set for pod %v: %v", medPriPod.Name, err)
}
// And the nominated node name of the medium priority pod is cleared.
@ -489,8 +487,7 @@ func TestNominatedNodeCleanUp(t *testing.T) {
if err != nil {
t.Errorf("Error getting the medium priority pod info: %v", err)
}
n, found := pod.Annotations[core.NominatedNodeAnnotationKey]
if !found || len(n) == 0 {
if len(pod.Status.NominatedNodeName) == 0 {
return true, nil
}
return false, err
@ -755,7 +752,7 @@ func TestPDBInPreemption(t *testing.T) {
}
// Also check that the preemptor pod gets the annotation for nominated node name.
if len(test.preemptedPodIndexes) > 0 {
if err := waitForNominatedNodeAnnotation(cs, preemptor); err != nil {
if err := waitForNominatedNodeName(cs, preemptor); err != nil {
t.Errorf("Test [%v]: NominatedNodeName annotation was not set for pod %v: %v", test.description, preemptor.Name, err)
}
}