diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 290c585c4ad..b58321c92e9 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -27,16 +27,15 @@ go_library( "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/profile:go_default_library", + "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index b28de9d99b3..33ef43cf157 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -73,6 +73,8 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/testing:go_default_library", + "//staging/src/k8s.io/client-go/tools/events:go_default_library", "//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library", ], ) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index a3666b935d7..cceb836593c 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -100,11 +100,10 @@ func (f *FitError) Error() string { // TODO: Rename this type. type ScheduleAlgorithm interface { Schedule(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error) - // Preempt receives scheduling errors for a pod and tries to create room for + // Preempt receives scheduling filter result (NodeToStatusMap) for a pod and tries to create room for // the pod by preempting lower priority pods if possible. - // It returns the node where preemption happened, a list of preempted pods, a - // list of pods whose nominated node name should be removed, and error if any. - Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, error) (selectedNode string, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error) + // It returns the node where preemption happened, and error if any. + Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, framework.NodeToStatusMap) (selectedNode string, err error) // Extenders returns a slice of extender config. This is exposed for // testing. Extenders() []framework.Extender @@ -249,29 +248,35 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st // other pods with the same priority. The nominated pod prevents other pods from // using the nominated resources and the nominated pod could take a long time // before it is retried after many other pending pods. -func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) { - // Scheduler may return various types of errors. Consider preemption only if - // the error is of type FitError. - fitError, ok := scheduleErr.(*FitError) - if !ok || fitError == nil { - return "", nil, nil, nil +func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) { + cs := prof.ClientSet() + // TODO(Huang-Wei): get pod from informer cache instead of API server. + pod, err := util.GetUpdatedPod(cs, pod) + if err != nil { + klog.Errorf("Error getting the updated preemptor pod object: %v", err) + return "", err } + if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfos()) { klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) - return "", nil, nil, nil + return "", nil } allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() if err != nil { - return "", nil, nil, err + return "", err } if len(allNodes) == 0 { - return "", nil, nil, ErrNoNodesAvailable + return "", ErrNoNodesAvailable } - potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError) + potentialNodes := nodesWherePreemptionMightHelp(allNodes, m) if len(potentialNodes) == 0 { klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name) // In this case, we should clean-up any existing nominated node name of the pod. - return "", nil, []*v1.Pod{pod}, nil + if err := util.ClearNominatedNodeName(cs, pod); err != nil { + klog.Errorf("Cannot clear 'NominatedNodeName' field of pod %v/%v: %v", pod.Namespace, pod.Name, err) + // We do not return as this error is not critical. + } + return "", nil } if klog.V(5).Enabled() { var sample []string @@ -284,12 +289,12 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s if g.pdbLister != nil { pdbs, err = g.pdbLister.List(labels.Everything()) if err != nil { - return "", nil, nil, err + return "", err } } nodeNameToVictims, err := selectNodesForPreemption(ctx, prof, g.podNominator, state, pod, potentialNodes, pdbs) if err != nil { - return "", nil, nil, err + return "", err } // We will only check nodeNameToVictims with extenders that support preemption. @@ -297,20 +302,39 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles. nodeNameToVictims, err = g.processPreemptionWithExtenders(pod, nodeNameToVictims) if err != nil { - return "", nil, nil, err + return "", err } candidateNode := pickOneNodeForPreemption(nodeNameToVictims) if len(candidateNode) == 0 { - return "", nil, nil, nil + return "", nil } + victims := nodeNameToVictims[candidateNode].Pods + for _, victim := range victims { + if err := util.DeletePod(cs, victim); err != nil { + klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err) + return "", err + } + // If the victim is a WaitingPod, send a reject message to the PermitPlugin + if waitingPod := prof.GetWaitingPod(victim.UID); waitingPod != nil { + waitingPod.Reject("preempted") + } + prof.Recorder.Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode) + } + metrics.PreemptionVictims.Observe(float64(len(victims))) + // Lower priority pods nominated to run on this node, may no longer fit on // this node. So, we should remove their nomination. Removing their // nomination updates these pods and moves them to the active queue. It // lets scheduler find another place for them. nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode) - return candidateNode, nodeNameToVictims[candidateNode].Pods, nominatedPods, nil + if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil { + klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err) + // We do not return as this error is not critical. + } + + return candidateNode, nil } // processPreemptionWithExtenders processes preemption with extenders @@ -1041,13 +1065,13 @@ func selectVictimsOnNode( // nodesWherePreemptionMightHelp returns a list of nodes with failed predicates // that may be satisfied by removing pods from the node. -func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, fitErr *FitError) []*framework.NodeInfo { +func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, m framework.NodeToStatusMap) []*framework.NodeInfo { var potentialNodes []*framework.NodeInfo for _, node := range nodes { name := node.Node().Name // We reply on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable' // to determine whether preemption may help or not on the node. - if fitErr.FilteredNodesStatuses[name].Code() == framework.UnschedulableAndUnresolvable { + if m[name].Code() == framework.UnschedulableAndUnresolvable { continue } potentialNodes = append(potentialNodes, node) diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index ecd9950a7cb..1c159ef3031 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -37,6 +37,8 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" + clienttesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/events" extenderv1 "k8s.io/kube-scheduler/extender/v1" volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -2021,16 +2023,13 @@ func TestNodesWherePreemptionMightHelp(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - fitErr := FitError{ - FilteredNodesStatuses: test.nodesStatuses, - } var nodeInfos []*framework.NodeInfo for _, n := range makeNodeList(nodeNames) { ni := framework.NewNodeInfo() ni.SetNode(n) nodeInfos = append(nodeInfos, ni) } - nodes := nodesWherePreemptionMightHelp(nodeInfos, &fitErr) + nodes := nodesWherePreemptionMightHelp(nodeInfos, test.nodesStatuses) if len(test.expected) != len(nodes) { t.Errorf("number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", len(test.expected), len(nodes), nodes) } @@ -2359,7 +2358,13 @@ func TestPreempt(t *testing.T) { labelKeys := []string{"hostname", "zone", "region"} for _, test := range tests { t.Run(test.name, func(t *testing.T) { - client := clientsetfake.NewSimpleClientset() + apiObjs := mergeObjs(test.pod, test.pods) + client := clientsetfake.NewSimpleClientset(apiObjs...) + deletedPodNames := make(sets.String) + client.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + deletedPodNames.Insert(action.(clienttesting.DeleteAction).GetName()) + return true, nil, nil + }) informerFactory := informers.NewSharedInformerFactory(client, 0) stop := make(chan struct{}) @@ -2399,11 +2404,18 @@ func TestPreempt(t *testing.T) { } snapshot := internalcache.NewSnapshot(test.pods, nodes) - fwk, err := st.NewFramework(test.registerPlugins, framework.WithSnapshotSharedLister(snapshot)) + fwk, err := st.NewFramework( + test.registerPlugins, + framework.WithClientSet(client), + framework.WithSnapshotSharedLister(snapshot), + ) if err != nil { t.Fatal(err) } - prof := &profile.Profile{Framework: fwk} + prof := &profile.Profile{ + Framework: fwk, + Recorder: &events.FakeRecorder{}, + } scheduler := NewGenericScheduler( cache, @@ -2425,7 +2437,7 @@ func TestPreempt(t *testing.T) { if test.failedNodeToStatusMap != nil { failedNodeToStatusMap = test.failedNodeToStatusMap } - node, victims, _, err := scheduler.Preempt(context.Background(), prof, state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap})) + node, err := scheduler.Preempt(context.Background(), prof, state, test.pod, failedNodeToStatusMap) if err != nil { t.Errorf("unexpected error in preemption: %v", err) } @@ -2435,31 +2447,39 @@ func TestPreempt(t *testing.T) { if len(node) == 0 && len(test.expectedNode) != 0 { t.Errorf("expected node: %v, got: nothing", test.expectedNode) } - if len(victims) != len(test.expectedPods) { - t.Errorf("expected %v pods, got %v.", len(test.expectedPods), len(victims)) + if len(deletedPodNames) != len(test.expectedPods) { + t.Errorf("expected %v pods, got %v.", len(test.expectedPods), len(deletedPodNames)) } - for _, victim := range victims { + for victimName := range deletedPodNames { found := false for _, expPod := range test.expectedPods { - if expPod == victim.Name { + if expPod == victimName { found = true break } } if !found { - t.Errorf("pod %v is not expected to be a victim.", victim.Name) + t.Fatalf("pod %v is not expected to be a victim.", victimName) } - // Mark the victims for deletion and record the preemptor's nominated node name. - now := metav1.Now() - victim.DeletionTimestamp = &now - test.pod.Status.NominatedNodeName = node } + test.pod.Status.NominatedNodeName = node + client.CoreV1().Pods(test.pod.Namespace).Update(context.TODO(), test.pod, metav1.UpdateOptions{}) + + // Manually set the deleted Pods' deletionTimestamp to non-nil. + for _, pod := range test.pods { + if deletedPodNames.Has(pod.Name) { + now := metav1.Now() + pod.DeletionTimestamp = &now + deletedPodNames.Delete(pod.Name) + } + } + // Call preempt again and make sure it doesn't preempt any more pods. - node, victims, _, err = scheduler.Preempt(context.Background(), prof, state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap})) + node, err = scheduler.Preempt(context.Background(), prof, state, test.pod, failedNodeToStatusMap) if err != nil { t.Errorf("unexpected error in preemption: %v", err) } - if len(node) != 0 && len(victims) > 0 { + if len(node) != 0 && len(deletedPodNames) > 0 { t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", node) } close(stop) @@ -2576,3 +2596,14 @@ func nodesToNodeInfos(nodes []*v1.Node, snapshot *internalcache.Snapshot) ([]*fr } return nodeInfos, nil } + +func mergeObjs(pod *v1.Pod, pods []*v1.Pod) []runtime.Object { + var objs []runtime.Object + if pod != nil { + objs = append(objs, pod) + } + for i := range pods { + objs = append(objs, pods[i]) + } + return objs +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index dfbd06a5d8c..c88555528a2 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -18,7 +18,6 @@ package scheduler import ( "context" - "encoding/json" "fmt" "io/ioutil" "math/rand" @@ -28,8 +27,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" @@ -46,6 +43,7 @@ import ( internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/profile" + "k8s.io/kubernetes/pkg/scheduler/util" ) const ( @@ -55,15 +53,6 @@ const ( pluginMetricsSamplePercent = 10 ) -// PodPreemptor has methods needed to delete a pod and to update 'NominatedPod' -// field of the preemptor pod. -// TODO (ahmad-diaa): Remove type and replace it with scheduler methods -type podPreemptor interface { - getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) - deletePod(pod *v1.Pod) error - removeNominatedNodeName(pod *v1.Pod) error -} - // Scheduler watches for new unscheduled pods. It attempts to find // nodes that they fit on and writes bindings back to the api server. type Scheduler struct { @@ -72,9 +61,6 @@ type Scheduler struct { SchedulerCache internalcache.Cache Algorithm core.ScheduleAlgorithm - // PodPreemptor is used to evict pods and update 'NominatedNode' field of - // the preemptor pod. - podPreemptor podPreemptor // NextPod should be a function that blocks until the next pod // is available. We don't use a channel for this, because scheduling @@ -292,7 +278,6 @@ func New(client clientset.Interface, sched.DisablePreemption = options.disablePreemption sched.StopEverything = stopEverything sched.client = client - sched.podPreemptor = &podPreemptorImpl{client} sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced addAllEventHandlers(sched, informerFactory, podInformer) @@ -382,52 +367,7 @@ func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodConditi if nominatedNode != "" { podCopy.Status.NominatedNodeName = nominatedNode } - return patchPod(client, pod, podCopy) -} - -// preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible. -// If it succeeds, it adds the name of the node where preemption has happened to the pod spec. -// It returns the node name and an error if any. -func (sched *Scheduler) preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, preemptor *v1.Pod, scheduleErr error) (string, error) { - preemptor, err := sched.podPreemptor.getUpdatedPod(preemptor) - if err != nil { - klog.Errorf("Error getting the updated preemptor pod object: %v", err) - return "", err - } - - nodeName, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, prof, state, preemptor, scheduleErr) - if err != nil { - klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err) - return "", err - } - if len(nodeName) != 0 { - for _, victim := range victims { - if err := sched.podPreemptor.deletePod(victim); err != nil { - klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err) - return "", err - } - // If the victim is a WaitingPod, send a reject message to the PermitPlugin - if waitingPod := prof.GetWaitingPod(victim.UID); waitingPod != nil { - waitingPod.Reject("preempted") - } - prof.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName) - - } - metrics.PreemptionVictims.Observe(float64(len(victims))) - } - // Clearing nominated pods should happen outside of "if node != nil". Node could - // be nil when a pod with nominated node name is eligible to preempt again, - // but preemption logic does not find any node for it. In that case Preempt() - // function of generic_scheduler.go returns the pod itself for removal of - // the 'NominatedNodeName' field. - for _, p := range nominatedPodsToClear { - rErr := sched.podPreemptor.removeNominatedNodeName(p) - if rErr != nil { - klog.Errorf("Cannot remove 'NominatedNodeName' field of pod: %v", rErr) - // We do not return as this error is not critical. - } - } - return nodeName, err + return util.PatchPod(client, pod, podCopy) } // assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous. @@ -546,7 +486,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { } else { preemptionStartTime := time.Now() // TODO(Huang-Wei): implement the preemption logic as a PostFilter plugin. - nominatedNode, _ = sched.preempt(schedulingCycleCtx, prof, state, pod, fitError) + nominatedNode, _ = sched.Algorithm.Preempt(schedulingCycleCtx, prof, state, pod, fitError.FilteredNodesStatuses) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) @@ -715,45 +655,6 @@ func (sched *Scheduler) skipPodSchedule(prof *profile.Profile, pod *v1.Pod) bool return false } -type podPreemptorImpl struct { - Client clientset.Interface -} - -func (p *podPreemptorImpl) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) { - return p.Client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) -} - -func (p *podPreemptorImpl) deletePod(pod *v1.Pod) error { - return p.Client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) -} - -func (p *podPreemptorImpl) removeNominatedNodeName(pod *v1.Pod) error { - if len(pod.Status.NominatedNodeName) == 0 { - return nil - } - podCopy := pod.DeepCopy() - podCopy.Status.NominatedNodeName = "" - return patchPod(p.Client, pod, podCopy) -} - -func patchPod(client clientset.Interface, old *v1.Pod, new *v1.Pod) error { - oldData, err := json.Marshal(old) - if err != nil { - return err - } - - newData, err := json.Marshal(new) - if err != nil { - return err - } - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{}) - if err != nil { - return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", old.Namespace, old.Name, err) - } - _, err = client.CoreV1().Pods(old.Namespace).Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") - return err -} - func defaultAlgorithmSourceProviderName() *string { provider := schedulerapi.SchedulerDefaultProviderName return &provider diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 4152bf7d313..48de4840167 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -63,20 +63,6 @@ import ( st "k8s.io/kubernetes/pkg/scheduler/testing" ) -type fakePodPreemptor struct{} - -func (fp fakePodPreemptor) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) { - return pod, nil -} - -func (fp fakePodPreemptor) deletePod(pod *v1.Pod) error { - return nil -} - -func (fp fakePodPreemptor) removeNominatedNodeName(pod *v1.Pod) error { - return nil -} - func podWithID(id, desiredHost string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -134,8 +120,8 @@ func (es mockScheduler) Extenders() []framework.Extender { return nil } -func (es mockScheduler) Preempt(ctx context.Context, profile *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) { - return "", nil, nil, nil +func (es mockScheduler) Preempt(ctx context.Context, profile *profile.Profile, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) { + return "", nil } func TestSchedulerCreation(t *testing.T) { @@ -816,9 +802,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C Error: func(p *framework.QueuedPodInfo, err error) { errChan <- err }, - Profiles: profiles, - client: client, - podPreemptor: fakePodPreemptor{}, + Profiles: profiles, + client: client, } return sched, bindingChan, errChan @@ -1185,61 +1170,6 @@ func TestSchedulerBinding(t *testing.T) { } } -func TestRemoveNominatedNodeName(t *testing.T) { - tests := []struct { - name string - currentNominatedNodeName string - newNominatedNodeName string - expectedPatchRequests int - expectedPatchData string - }{ - { - name: "Should make patch request to clear node name", - currentNominatedNodeName: "node1", - expectedPatchRequests: 1, - expectedPatchData: `{"status":{"nominatedNodeName":null}}`, - }, - { - name: "Should not make patch request if nominated node is already cleared", - currentNominatedNodeName: "", - expectedPatchRequests: 0, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - actualPatchRequests := 0 - var actualPatchData string - cs := &clientsetfake.Clientset{} - cs.AddReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { - actualPatchRequests++ - patch := action.(clienttesting.PatchAction) - actualPatchData = string(patch.GetPatch()) - // For this test, we don't care about the result of the patched pod, just that we got the expected - // patch request, so just returning &v1.Pod{} here is OK because scheduler doesn't use the response. - return true, &v1.Pod{}, nil - }) - - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - Status: v1.PodStatus{NominatedNodeName: test.currentNominatedNodeName}, - } - - preemptor := &podPreemptorImpl{Client: cs} - if err := preemptor.removeNominatedNodeName(pod); err != nil { - t.Fatalf("Error calling removeNominatedNodeName: %v", err) - } - - if actualPatchRequests != test.expectedPatchRequests { - t.Fatalf("Actual patch requests (%d) dos not equal expected patch requests (%d)", actualPatchRequests, test.expectedPatchRequests) - } - - if test.expectedPatchRequests > 0 && actualPatchData != test.expectedPatchData { - t.Fatalf("Patch data mismatch: Actual was %v, but expected %v", actualPatchData, test.expectedPatchData) - } - }) - } -} - func TestUpdatePod(t *testing.T) { tests := []struct { name string diff --git a/pkg/scheduler/util/BUILD b/pkg/scheduler/util/BUILD index 028ff08a1e2..0b292065e0e 100644 --- a/pkg/scheduler/util/BUILD +++ b/pkg/scheduler/util/BUILD @@ -19,8 +19,11 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/selection:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", ], @@ -42,8 +45,12 @@ go_library( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", ], diff --git a/pkg/scheduler/util/utils.go b/pkg/scheduler/util/utils.go index 76d27520b19..05e822e67d9 100644 --- a/pkg/scheduler/util/utils.go +++ b/pkg/scheduler/util/utils.go @@ -17,10 +17,17 @@ limitations under the License. package util import ( + "context" + "encoding/json" + "fmt" "time" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" extenderv1 "k8s.io/kube-scheduler/extender/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" @@ -109,3 +116,50 @@ func GetPodAntiAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) } return terms } + +// PatchPod calculates the delta bytes change from to , +// and then submit a request to API server to patch the pod changes. +func PatchPod(cs kubernetes.Interface, old *v1.Pod, new *v1.Pod) error { + oldData, err := json.Marshal(old) + if err != nil { + return err + } + + newData, err := json.Marshal(new) + if err != nil { + return err + } + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{}) + if err != nil { + return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", old.Namespace, old.Name, err) + } + _, err = cs.CoreV1().Pods(old.Namespace).Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") + return err +} + +// GetUpdatedPod returns the latest version of from API server. +func GetUpdatedPod(cs kubernetes.Interface, pod *v1.Pod) (*v1.Pod, error) { + return cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) +} + +// DeletePod deletes the given from API server +func DeletePod(cs kubernetes.Interface, pod *v1.Pod) error { + return cs.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) +} + +// ClearNominatedNodeName internally submit a patch request to API server +// to set each pods[*].Status.NominatedNodeName> to "". +func ClearNominatedNodeName(cs kubernetes.Interface, pods ...*v1.Pod) utilerrors.Aggregate { + var errs []error + for _, p := range pods { + if len(p.Status.NominatedNodeName) == 0 { + continue + } + podCopy := p.DeepCopy() + podCopy.Status.NominatedNodeName = "" + if err := PatchPod(cs, p, podCopy); err != nil { + errs = append(errs, err) + } + } + return utilerrors.NewAggregate(errs) +} diff --git a/pkg/scheduler/util/utils_test.go b/pkg/scheduler/util/utils_test.go index 4c1467da558..0a7e6f5967a 100644 --- a/pkg/scheduler/util/utils_test.go +++ b/pkg/scheduler/util/utils_test.go @@ -23,6 +23,9 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + clientsetfake "k8s.io/client-go/kubernetes/fake" + clienttesting "k8s.io/client-go/testing" extenderv1 "k8s.io/kube-scheduler/extender/v1" ) @@ -114,3 +117,57 @@ func TestMoreImportantPod(t *testing.T) { } } } + +func TestRemoveNominatedNodeName(t *testing.T) { + tests := []struct { + name string + currentNominatedNodeName string + newNominatedNodeName string + expectedPatchRequests int + expectedPatchData string + }{ + { + name: "Should make patch request to clear node name", + currentNominatedNodeName: "node1", + expectedPatchRequests: 1, + expectedPatchData: `{"status":{"nominatedNodeName":null}}`, + }, + { + name: "Should not make patch request if nominated node is already cleared", + currentNominatedNodeName: "", + expectedPatchRequests: 0, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actualPatchRequests := 0 + var actualPatchData string + cs := &clientsetfake.Clientset{} + cs.AddReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + actualPatchRequests++ + patch := action.(clienttesting.PatchAction) + actualPatchData = string(patch.GetPatch()) + // For this test, we don't care about the result of the patched pod, just that we got the expected + // patch request, so just returning &v1.Pod{} here is OK because scheduler doesn't use the response. + return true, &v1.Pod{}, nil + }) + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Status: v1.PodStatus{NominatedNodeName: test.currentNominatedNodeName}, + } + + if err := ClearNominatedNodeName(cs, pod); err != nil { + t.Fatalf("Error calling removeNominatedNodeName: %v", err) + } + + if actualPatchRequests != test.expectedPatchRequests { + t.Fatalf("Actual patch requests (%d) dos not equal expected patch requests (%d)", actualPatchRequests, test.expectedPatchRequests) + } + + if test.expectedPatchRequests > 0 && actualPatchData != test.expectedPatchData { + t.Fatalf("Patch data mismatch: Actual was %v, but expected %v", actualPatchData, test.expectedPatchData) + } + }) + } +}