mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #92009 from Huang-Wei/postfilter-impl-1
[postfilter-impl-1] Refactor scheduler preempt interface
This commit is contained in:
commit
84799c47bf
@ -27,16 +27,15 @@ go_library(
|
|||||||
"//pkg/scheduler/internal/queue:go_default_library",
|
"//pkg/scheduler/internal/queue:go_default_library",
|
||||||
"//pkg/scheduler/metrics:go_default_library",
|
"//pkg/scheduler/metrics:go_default_library",
|
||||||
"//pkg/scheduler/profile:go_default_library",
|
"//pkg/scheduler/profile:go_default_library",
|
||||||
|
"//pkg/scheduler/util:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/storage/v1:go_default_library",
|
"//staging/src/k8s.io/api/storage/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||||
|
@ -73,6 +73,8 @@ go_test(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
|
||||||
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
|
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -100,11 +100,10 @@ func (f *FitError) Error() string {
|
|||||||
// TODO: Rename this type.
|
// TODO: Rename this type.
|
||||||
type ScheduleAlgorithm interface {
|
type ScheduleAlgorithm interface {
|
||||||
Schedule(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
|
Schedule(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
|
||||||
// Preempt receives scheduling errors for a pod and tries to create room for
|
// Preempt receives scheduling filter result (NodeToStatusMap) for a pod and tries to create room for
|
||||||
// the pod by preempting lower priority pods if possible.
|
// the pod by preempting lower priority pods if possible.
|
||||||
// It returns the node where preemption happened, a list of preempted pods, a
|
// It returns the node where preemption happened, and error if any.
|
||||||
// list of pods whose nominated node name should be removed, and error if any.
|
Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, framework.NodeToStatusMap) (selectedNode string, err error)
|
||||||
Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, error) (selectedNode string, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
|
|
||||||
// Extenders returns a slice of extender config. This is exposed for
|
// Extenders returns a slice of extender config. This is exposed for
|
||||||
// testing.
|
// testing.
|
||||||
Extenders() []framework.Extender
|
Extenders() []framework.Extender
|
||||||
@ -249,29 +248,35 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st
|
|||||||
// other pods with the same priority. The nominated pod prevents other pods from
|
// other pods with the same priority. The nominated pod prevents other pods from
|
||||||
// using the nominated resources and the nominated pod could take a long time
|
// using the nominated resources and the nominated pod could take a long time
|
||||||
// before it is retried after many other pending pods.
|
// before it is retried after many other pending pods.
|
||||||
func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) {
|
func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) {
|
||||||
// Scheduler may return various types of errors. Consider preemption only if
|
cs := prof.ClientSet()
|
||||||
// the error is of type FitError.
|
// TODO(Huang-Wei): get pod from informer cache instead of API server.
|
||||||
fitError, ok := scheduleErr.(*FitError)
|
pod, err := util.GetUpdatedPod(cs, pod)
|
||||||
if !ok || fitError == nil {
|
if err != nil {
|
||||||
return "", nil, nil, nil
|
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
|
||||||
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfos()) {
|
if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfos()) {
|
||||||
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
|
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
|
||||||
return "", nil, nil, nil
|
return "", nil
|
||||||
}
|
}
|
||||||
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
|
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, nil, err
|
return "", err
|
||||||
}
|
}
|
||||||
if len(allNodes) == 0 {
|
if len(allNodes) == 0 {
|
||||||
return "", nil, nil, ErrNoNodesAvailable
|
return "", ErrNoNodesAvailable
|
||||||
}
|
}
|
||||||
potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError)
|
potentialNodes := nodesWherePreemptionMightHelp(allNodes, m)
|
||||||
if len(potentialNodes) == 0 {
|
if len(potentialNodes) == 0 {
|
||||||
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
|
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
|
||||||
// In this case, we should clean-up any existing nominated node name of the pod.
|
// In this case, we should clean-up any existing nominated node name of the pod.
|
||||||
return "", nil, []*v1.Pod{pod}, nil
|
if err := util.ClearNominatedNodeName(cs, pod); err != nil {
|
||||||
|
klog.Errorf("Cannot clear 'NominatedNodeName' field of pod %v/%v: %v", pod.Namespace, pod.Name, err)
|
||||||
|
// We do not return as this error is not critical.
|
||||||
|
}
|
||||||
|
return "", nil
|
||||||
}
|
}
|
||||||
if klog.V(5).Enabled() {
|
if klog.V(5).Enabled() {
|
||||||
var sample []string
|
var sample []string
|
||||||
@ -284,12 +289,12 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s
|
|||||||
if g.pdbLister != nil {
|
if g.pdbLister != nil {
|
||||||
pdbs, err = g.pdbLister.List(labels.Everything())
|
pdbs, err = g.pdbLister.List(labels.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, nil, err
|
return "", err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
nodeNameToVictims, err := selectNodesForPreemption(ctx, prof, g.podNominator, state, pod, potentialNodes, pdbs)
|
nodeNameToVictims, err := selectNodesForPreemption(ctx, prof, g.podNominator, state, pod, potentialNodes, pdbs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, nil, err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
// We will only check nodeNameToVictims with extenders that support preemption.
|
// We will only check nodeNameToVictims with extenders that support preemption.
|
||||||
@ -297,20 +302,39 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s
|
|||||||
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
|
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
|
||||||
nodeNameToVictims, err = g.processPreemptionWithExtenders(pod, nodeNameToVictims)
|
nodeNameToVictims, err = g.processPreemptionWithExtenders(pod, nodeNameToVictims)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, nil, err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
candidateNode := pickOneNodeForPreemption(nodeNameToVictims)
|
candidateNode := pickOneNodeForPreemption(nodeNameToVictims)
|
||||||
if len(candidateNode) == 0 {
|
if len(candidateNode) == 0 {
|
||||||
return "", nil, nil, nil
|
return "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
victims := nodeNameToVictims[candidateNode].Pods
|
||||||
|
for _, victim := range victims {
|
||||||
|
if err := util.DeletePod(cs, victim); err != nil {
|
||||||
|
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
|
||||||
|
if waitingPod := prof.GetWaitingPod(victim.UID); waitingPod != nil {
|
||||||
|
waitingPod.Reject("preempted")
|
||||||
|
}
|
||||||
|
prof.Recorder.Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode)
|
||||||
|
}
|
||||||
|
metrics.PreemptionVictims.Observe(float64(len(victims)))
|
||||||
|
|
||||||
// Lower priority pods nominated to run on this node, may no longer fit on
|
// Lower priority pods nominated to run on this node, may no longer fit on
|
||||||
// this node. So, we should remove their nomination. Removing their
|
// this node. So, we should remove their nomination. Removing their
|
||||||
// nomination updates these pods and moves them to the active queue. It
|
// nomination updates these pods and moves them to the active queue. It
|
||||||
// lets scheduler find another place for them.
|
// lets scheduler find another place for them.
|
||||||
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode)
|
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode)
|
||||||
return candidateNode, nodeNameToVictims[candidateNode].Pods, nominatedPods, nil
|
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
|
||||||
|
klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err)
|
||||||
|
// We do not return as this error is not critical.
|
||||||
|
}
|
||||||
|
|
||||||
|
return candidateNode, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// processPreemptionWithExtenders processes preemption with extenders
|
// processPreemptionWithExtenders processes preemption with extenders
|
||||||
@ -1041,13 +1065,13 @@ func selectVictimsOnNode(
|
|||||||
|
|
||||||
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
|
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
|
||||||
// that may be satisfied by removing pods from the node.
|
// that may be satisfied by removing pods from the node.
|
||||||
func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, fitErr *FitError) []*framework.NodeInfo {
|
func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, m framework.NodeToStatusMap) []*framework.NodeInfo {
|
||||||
var potentialNodes []*framework.NodeInfo
|
var potentialNodes []*framework.NodeInfo
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
name := node.Node().Name
|
name := node.Node().Name
|
||||||
// We reply on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable'
|
// We reply on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable'
|
||||||
// to determine whether preemption may help or not on the node.
|
// to determine whether preemption may help or not on the node.
|
||||||
if fitErr.FilteredNodesStatuses[name].Code() == framework.UnschedulableAndUnresolvable {
|
if m[name].Code() == framework.UnschedulableAndUnresolvable {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
potentialNodes = append(potentialNodes, node)
|
potentialNodes = append(potentialNodes, node)
|
||||||
|
@ -37,6 +37,8 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||||
|
clienttesting "k8s.io/client-go/testing"
|
||||||
|
"k8s.io/client-go/tools/events"
|
||||||
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
||||||
volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
|
volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
|
||||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
@ -2021,16 +2023,13 @@ func TestNodesWherePreemptionMightHelp(t *testing.T) {
|
|||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
fitErr := FitError{
|
|
||||||
FilteredNodesStatuses: test.nodesStatuses,
|
|
||||||
}
|
|
||||||
var nodeInfos []*framework.NodeInfo
|
var nodeInfos []*framework.NodeInfo
|
||||||
for _, n := range makeNodeList(nodeNames) {
|
for _, n := range makeNodeList(nodeNames) {
|
||||||
ni := framework.NewNodeInfo()
|
ni := framework.NewNodeInfo()
|
||||||
ni.SetNode(n)
|
ni.SetNode(n)
|
||||||
nodeInfos = append(nodeInfos, ni)
|
nodeInfos = append(nodeInfos, ni)
|
||||||
}
|
}
|
||||||
nodes := nodesWherePreemptionMightHelp(nodeInfos, &fitErr)
|
nodes := nodesWherePreemptionMightHelp(nodeInfos, test.nodesStatuses)
|
||||||
if len(test.expected) != len(nodes) {
|
if len(test.expected) != len(nodes) {
|
||||||
t.Errorf("number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", len(test.expected), len(nodes), nodes)
|
t.Errorf("number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", len(test.expected), len(nodes), nodes)
|
||||||
}
|
}
|
||||||
@ -2359,7 +2358,13 @@ func TestPreempt(t *testing.T) {
|
|||||||
labelKeys := []string{"hostname", "zone", "region"}
|
labelKeys := []string{"hostname", "zone", "region"}
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
client := clientsetfake.NewSimpleClientset()
|
apiObjs := mergeObjs(test.pod, test.pods)
|
||||||
|
client := clientsetfake.NewSimpleClientset(apiObjs...)
|
||||||
|
deletedPodNames := make(sets.String)
|
||||||
|
client.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
||||||
|
deletedPodNames.Insert(action.(clienttesting.DeleteAction).GetName())
|
||||||
|
return true, nil, nil
|
||||||
|
})
|
||||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||||
|
|
||||||
stop := make(chan struct{})
|
stop := make(chan struct{})
|
||||||
@ -2399,11 +2404,18 @@ func TestPreempt(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
snapshot := internalcache.NewSnapshot(test.pods, nodes)
|
snapshot := internalcache.NewSnapshot(test.pods, nodes)
|
||||||
fwk, err := st.NewFramework(test.registerPlugins, framework.WithSnapshotSharedLister(snapshot))
|
fwk, err := st.NewFramework(
|
||||||
|
test.registerPlugins,
|
||||||
|
framework.WithClientSet(client),
|
||||||
|
framework.WithSnapshotSharedLister(snapshot),
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
prof := &profile.Profile{Framework: fwk}
|
prof := &profile.Profile{
|
||||||
|
Framework: fwk,
|
||||||
|
Recorder: &events.FakeRecorder{},
|
||||||
|
}
|
||||||
|
|
||||||
scheduler := NewGenericScheduler(
|
scheduler := NewGenericScheduler(
|
||||||
cache,
|
cache,
|
||||||
@ -2425,7 +2437,7 @@ func TestPreempt(t *testing.T) {
|
|||||||
if test.failedNodeToStatusMap != nil {
|
if test.failedNodeToStatusMap != nil {
|
||||||
failedNodeToStatusMap = test.failedNodeToStatusMap
|
failedNodeToStatusMap = test.failedNodeToStatusMap
|
||||||
}
|
}
|
||||||
node, victims, _, err := scheduler.Preempt(context.Background(), prof, state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap}))
|
node, err := scheduler.Preempt(context.Background(), prof, state, test.pod, failedNodeToStatusMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error in preemption: %v", err)
|
t.Errorf("unexpected error in preemption: %v", err)
|
||||||
}
|
}
|
||||||
@ -2435,31 +2447,39 @@ func TestPreempt(t *testing.T) {
|
|||||||
if len(node) == 0 && len(test.expectedNode) != 0 {
|
if len(node) == 0 && len(test.expectedNode) != 0 {
|
||||||
t.Errorf("expected node: %v, got: nothing", test.expectedNode)
|
t.Errorf("expected node: %v, got: nothing", test.expectedNode)
|
||||||
}
|
}
|
||||||
if len(victims) != len(test.expectedPods) {
|
if len(deletedPodNames) != len(test.expectedPods) {
|
||||||
t.Errorf("expected %v pods, got %v.", len(test.expectedPods), len(victims))
|
t.Errorf("expected %v pods, got %v.", len(test.expectedPods), len(deletedPodNames))
|
||||||
}
|
}
|
||||||
for _, victim := range victims {
|
for victimName := range deletedPodNames {
|
||||||
found := false
|
found := false
|
||||||
for _, expPod := range test.expectedPods {
|
for _, expPod := range test.expectedPods {
|
||||||
if expPod == victim.Name {
|
if expPod == victimName {
|
||||||
found = true
|
found = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !found {
|
if !found {
|
||||||
t.Errorf("pod %v is not expected to be a victim.", victim.Name)
|
t.Fatalf("pod %v is not expected to be a victim.", victimName)
|
||||||
}
|
}
|
||||||
// Mark the victims for deletion and record the preemptor's nominated node name.
|
|
||||||
now := metav1.Now()
|
|
||||||
victim.DeletionTimestamp = &now
|
|
||||||
test.pod.Status.NominatedNodeName = node
|
|
||||||
}
|
}
|
||||||
|
test.pod.Status.NominatedNodeName = node
|
||||||
|
client.CoreV1().Pods(test.pod.Namespace).Update(context.TODO(), test.pod, metav1.UpdateOptions{})
|
||||||
|
|
||||||
|
// Manually set the deleted Pods' deletionTimestamp to non-nil.
|
||||||
|
for _, pod := range test.pods {
|
||||||
|
if deletedPodNames.Has(pod.Name) {
|
||||||
|
now := metav1.Now()
|
||||||
|
pod.DeletionTimestamp = &now
|
||||||
|
deletedPodNames.Delete(pod.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Call preempt again and make sure it doesn't preempt any more pods.
|
// Call preempt again and make sure it doesn't preempt any more pods.
|
||||||
node, victims, _, err = scheduler.Preempt(context.Background(), prof, state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap}))
|
node, err = scheduler.Preempt(context.Background(), prof, state, test.pod, failedNodeToStatusMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error in preemption: %v", err)
|
t.Errorf("unexpected error in preemption: %v", err)
|
||||||
}
|
}
|
||||||
if len(node) != 0 && len(victims) > 0 {
|
if len(node) != 0 && len(deletedPodNames) > 0 {
|
||||||
t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", node)
|
t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", node)
|
||||||
}
|
}
|
||||||
close(stop)
|
close(stop)
|
||||||
@ -2576,3 +2596,14 @@ func nodesToNodeInfos(nodes []*v1.Node, snapshot *internalcache.Snapshot) ([]*fr
|
|||||||
}
|
}
|
||||||
return nodeInfos, nil
|
return nodeInfos, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func mergeObjs(pod *v1.Pod, pods []*v1.Pod) []runtime.Object {
|
||||||
|
var objs []runtime.Object
|
||||||
|
if pod != nil {
|
||||||
|
objs = append(objs, pod)
|
||||||
|
}
|
||||||
|
for i := range pods {
|
||||||
|
objs = append(objs, pods[i])
|
||||||
|
}
|
||||||
|
return objs
|
||||||
|
}
|
||||||
|
@ -18,7 +18,6 @@ package scheduler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
@ -28,8 +27,6 @@ import (
|
|||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
|
||||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||||
@ -46,6 +43,7 @@ import (
|
|||||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/profile"
|
"k8s.io/kubernetes/pkg/scheduler/profile"
|
||||||
|
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -55,15 +53,6 @@ const (
|
|||||||
pluginMetricsSamplePercent = 10
|
pluginMetricsSamplePercent = 10
|
||||||
)
|
)
|
||||||
|
|
||||||
// PodPreemptor has methods needed to delete a pod and to update 'NominatedPod'
|
|
||||||
// field of the preemptor pod.
|
|
||||||
// TODO (ahmad-diaa): Remove type and replace it with scheduler methods
|
|
||||||
type podPreemptor interface {
|
|
||||||
getUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
|
|
||||||
deletePod(pod *v1.Pod) error
|
|
||||||
removeNominatedNodeName(pod *v1.Pod) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// Scheduler watches for new unscheduled pods. It attempts to find
|
// Scheduler watches for new unscheduled pods. It attempts to find
|
||||||
// nodes that they fit on and writes bindings back to the api server.
|
// nodes that they fit on and writes bindings back to the api server.
|
||||||
type Scheduler struct {
|
type Scheduler struct {
|
||||||
@ -72,9 +61,6 @@ type Scheduler struct {
|
|||||||
SchedulerCache internalcache.Cache
|
SchedulerCache internalcache.Cache
|
||||||
|
|
||||||
Algorithm core.ScheduleAlgorithm
|
Algorithm core.ScheduleAlgorithm
|
||||||
// PodPreemptor is used to evict pods and update 'NominatedNode' field of
|
|
||||||
// the preemptor pod.
|
|
||||||
podPreemptor podPreemptor
|
|
||||||
|
|
||||||
// NextPod should be a function that blocks until the next pod
|
// NextPod should be a function that blocks until the next pod
|
||||||
// is available. We don't use a channel for this, because scheduling
|
// is available. We don't use a channel for this, because scheduling
|
||||||
@ -292,7 +278,6 @@ func New(client clientset.Interface,
|
|||||||
sched.DisablePreemption = options.disablePreemption
|
sched.DisablePreemption = options.disablePreemption
|
||||||
sched.StopEverything = stopEverything
|
sched.StopEverything = stopEverything
|
||||||
sched.client = client
|
sched.client = client
|
||||||
sched.podPreemptor = &podPreemptorImpl{client}
|
|
||||||
sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced
|
sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced
|
||||||
|
|
||||||
addAllEventHandlers(sched, informerFactory, podInformer)
|
addAllEventHandlers(sched, informerFactory, podInformer)
|
||||||
@ -382,52 +367,7 @@ func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodConditi
|
|||||||
if nominatedNode != "" {
|
if nominatedNode != "" {
|
||||||
podCopy.Status.NominatedNodeName = nominatedNode
|
podCopy.Status.NominatedNodeName = nominatedNode
|
||||||
}
|
}
|
||||||
return patchPod(client, pod, podCopy)
|
return util.PatchPod(client, pod, podCopy)
|
||||||
}
|
|
||||||
|
|
||||||
// preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible.
|
|
||||||
// If it succeeds, it adds the name of the node where preemption has happened to the pod spec.
|
|
||||||
// It returns the node name and an error if any.
|
|
||||||
func (sched *Scheduler) preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, preemptor *v1.Pod, scheduleErr error) (string, error) {
|
|
||||||
preemptor, err := sched.podPreemptor.getUpdatedPod(preemptor)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeName, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, prof, state, preemptor, scheduleErr)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
if len(nodeName) != 0 {
|
|
||||||
for _, victim := range victims {
|
|
||||||
if err := sched.podPreemptor.deletePod(victim); err != nil {
|
|
||||||
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
|
|
||||||
if waitingPod := prof.GetWaitingPod(victim.UID); waitingPod != nil {
|
|
||||||
waitingPod.Reject("preempted")
|
|
||||||
}
|
|
||||||
prof.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
|
|
||||||
|
|
||||||
}
|
|
||||||
metrics.PreemptionVictims.Observe(float64(len(victims)))
|
|
||||||
}
|
|
||||||
// Clearing nominated pods should happen outside of "if node != nil". Node could
|
|
||||||
// be nil when a pod with nominated node name is eligible to preempt again,
|
|
||||||
// but preemption logic does not find any node for it. In that case Preempt()
|
|
||||||
// function of generic_scheduler.go returns the pod itself for removal of
|
|
||||||
// the 'NominatedNodeName' field.
|
|
||||||
for _, p := range nominatedPodsToClear {
|
|
||||||
rErr := sched.podPreemptor.removeNominatedNodeName(p)
|
|
||||||
if rErr != nil {
|
|
||||||
klog.Errorf("Cannot remove 'NominatedNodeName' field of pod: %v", rErr)
|
|
||||||
// We do not return as this error is not critical.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nodeName, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
|
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
|
||||||
@ -546,7 +486,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
|||||||
} else {
|
} else {
|
||||||
preemptionStartTime := time.Now()
|
preemptionStartTime := time.Now()
|
||||||
// TODO(Huang-Wei): implement the preemption logic as a PostFilter plugin.
|
// TODO(Huang-Wei): implement the preemption logic as a PostFilter plugin.
|
||||||
nominatedNode, _ = sched.preempt(schedulingCycleCtx, prof, state, pod, fitError)
|
nominatedNode, _ = sched.Algorithm.Preempt(schedulingCycleCtx, prof, state, pod, fitError.FilteredNodesStatuses)
|
||||||
metrics.PreemptionAttempts.Inc()
|
metrics.PreemptionAttempts.Inc()
|
||||||
metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
|
metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
|
||||||
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
|
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
|
||||||
@ -715,45 +655,6 @@ func (sched *Scheduler) skipPodSchedule(prof *profile.Profile, pod *v1.Pod) bool
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
type podPreemptorImpl struct {
|
|
||||||
Client clientset.Interface
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *podPreemptorImpl) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
|
|
||||||
return p.Client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *podPreemptorImpl) deletePod(pod *v1.Pod) error {
|
|
||||||
return p.Client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *podPreemptorImpl) removeNominatedNodeName(pod *v1.Pod) error {
|
|
||||||
if len(pod.Status.NominatedNodeName) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
podCopy := pod.DeepCopy()
|
|
||||||
podCopy.Status.NominatedNodeName = ""
|
|
||||||
return patchPod(p.Client, pod, podCopy)
|
|
||||||
}
|
|
||||||
|
|
||||||
func patchPod(client clientset.Interface, old *v1.Pod, new *v1.Pod) error {
|
|
||||||
oldData, err := json.Marshal(old)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
newData, err := json.Marshal(new)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", old.Namespace, old.Name, err)
|
|
||||||
}
|
|
||||||
_, err = client.CoreV1().Pods(old.Namespace).Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func defaultAlgorithmSourceProviderName() *string {
|
func defaultAlgorithmSourceProviderName() *string {
|
||||||
provider := schedulerapi.SchedulerDefaultProviderName
|
provider := schedulerapi.SchedulerDefaultProviderName
|
||||||
return &provider
|
return &provider
|
||||||
|
@ -63,20 +63,6 @@ import (
|
|||||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
type fakePodPreemptor struct{}
|
|
||||||
|
|
||||||
func (fp fakePodPreemptor) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
|
|
||||||
return pod, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fp fakePodPreemptor) deletePod(pod *v1.Pod) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fp fakePodPreemptor) removeNominatedNodeName(pod *v1.Pod) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func podWithID(id, desiredHost string) *v1.Pod {
|
func podWithID(id, desiredHost string) *v1.Pod {
|
||||||
return &v1.Pod{
|
return &v1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
@ -134,8 +120,8 @@ func (es mockScheduler) Extenders() []framework.Extender {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (es mockScheduler) Preempt(ctx context.Context, profile *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) {
|
func (es mockScheduler) Preempt(ctx context.Context, profile *profile.Profile, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) {
|
||||||
return "", nil, nil, nil
|
return "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSchedulerCreation(t *testing.T) {
|
func TestSchedulerCreation(t *testing.T) {
|
||||||
@ -816,9 +802,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
|||||||
Error: func(p *framework.QueuedPodInfo, err error) {
|
Error: func(p *framework.QueuedPodInfo, err error) {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
},
|
},
|
||||||
Profiles: profiles,
|
Profiles: profiles,
|
||||||
client: client,
|
client: client,
|
||||||
podPreemptor: fakePodPreemptor{},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return sched, bindingChan, errChan
|
return sched, bindingChan, errChan
|
||||||
@ -1185,61 +1170,6 @@ func TestSchedulerBinding(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRemoveNominatedNodeName(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
currentNominatedNodeName string
|
|
||||||
newNominatedNodeName string
|
|
||||||
expectedPatchRequests int
|
|
||||||
expectedPatchData string
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "Should make patch request to clear node name",
|
|
||||||
currentNominatedNodeName: "node1",
|
|
||||||
expectedPatchRequests: 1,
|
|
||||||
expectedPatchData: `{"status":{"nominatedNodeName":null}}`,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "Should not make patch request if nominated node is already cleared",
|
|
||||||
currentNominatedNodeName: "",
|
|
||||||
expectedPatchRequests: 0,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for _, test := range tests {
|
|
||||||
t.Run(test.name, func(t *testing.T) {
|
|
||||||
actualPatchRequests := 0
|
|
||||||
var actualPatchData string
|
|
||||||
cs := &clientsetfake.Clientset{}
|
|
||||||
cs.AddReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
|
||||||
actualPatchRequests++
|
|
||||||
patch := action.(clienttesting.PatchAction)
|
|
||||||
actualPatchData = string(patch.GetPatch())
|
|
||||||
// For this test, we don't care about the result of the patched pod, just that we got the expected
|
|
||||||
// patch request, so just returning &v1.Pod{} here is OK because scheduler doesn't use the response.
|
|
||||||
return true, &v1.Pod{}, nil
|
|
||||||
})
|
|
||||||
|
|
||||||
pod := &v1.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
|
|
||||||
Status: v1.PodStatus{NominatedNodeName: test.currentNominatedNodeName},
|
|
||||||
}
|
|
||||||
|
|
||||||
preemptor := &podPreemptorImpl{Client: cs}
|
|
||||||
if err := preemptor.removeNominatedNodeName(pod); err != nil {
|
|
||||||
t.Fatalf("Error calling removeNominatedNodeName: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if actualPatchRequests != test.expectedPatchRequests {
|
|
||||||
t.Fatalf("Actual patch requests (%d) dos not equal expected patch requests (%d)", actualPatchRequests, test.expectedPatchRequests)
|
|
||||||
}
|
|
||||||
|
|
||||||
if test.expectedPatchRequests > 0 && actualPatchData != test.expectedPatchData {
|
|
||||||
t.Fatalf("Patch data mismatch: Actual was %v, but expected %v", actualPatchData, test.expectedPatchData)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestUpdatePod(t *testing.T) {
|
func TestUpdatePod(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -19,8 +19,11 @@ go_test(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/selection:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/selection:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
||||||
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
|
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
|
||||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||||
],
|
],
|
||||||
@ -42,8 +45,12 @@ go_library(
|
|||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
|
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
|
||||||
"//vendor/k8s.io/klog/v2:go_default_library",
|
"//vendor/k8s.io/klog/v2:go_default_library",
|
||||||
],
|
],
|
||||||
|
@ -17,10 +17,17 @@ limitations under the License.
|
|||||||
package util
|
package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
|
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||||
|
"k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
||||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
@ -109,3 +116,50 @@ func GetPodAntiAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm)
|
|||||||
}
|
}
|
||||||
return terms
|
return terms
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PatchPod calculates the delta bytes change from <old> to <new>,
|
||||||
|
// and then submit a request to API server to patch the pod changes.
|
||||||
|
func PatchPod(cs kubernetes.Interface, old *v1.Pod, new *v1.Pod) error {
|
||||||
|
oldData, err := json.Marshal(old)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
newData, err := json.Marshal(new)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", old.Namespace, old.Name, err)
|
||||||
|
}
|
||||||
|
_, err = cs.CoreV1().Pods(old.Namespace).Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetUpdatedPod returns the latest version of <pod> from API server.
|
||||||
|
func GetUpdatedPod(cs kubernetes.Interface, pod *v1.Pod) (*v1.Pod, error) {
|
||||||
|
return cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeletePod deletes the given <pod> from API server
|
||||||
|
func DeletePod(cs kubernetes.Interface, pod *v1.Pod) error {
|
||||||
|
return cs.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClearNominatedNodeName internally submit a patch request to API server
|
||||||
|
// to set each pods[*].Status.NominatedNodeName> to "".
|
||||||
|
func ClearNominatedNodeName(cs kubernetes.Interface, pods ...*v1.Pod) utilerrors.Aggregate {
|
||||||
|
var errs []error
|
||||||
|
for _, p := range pods {
|
||||||
|
if len(p.Status.NominatedNodeName) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
podCopy := p.DeepCopy()
|
||||||
|
podCopy.Status.NominatedNodeName = ""
|
||||||
|
if err := PatchPod(cs, p, podCopy); err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return utilerrors.NewAggregate(errs)
|
||||||
|
}
|
||||||
|
@ -23,6 +23,9 @@ import (
|
|||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||||
|
clienttesting "k8s.io/client-go/testing"
|
||||||
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -114,3 +117,57 @@ func TestMoreImportantPod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRemoveNominatedNodeName(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
currentNominatedNodeName string
|
||||||
|
newNominatedNodeName string
|
||||||
|
expectedPatchRequests int
|
||||||
|
expectedPatchData string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Should make patch request to clear node name",
|
||||||
|
currentNominatedNodeName: "node1",
|
||||||
|
expectedPatchRequests: 1,
|
||||||
|
expectedPatchData: `{"status":{"nominatedNodeName":null}}`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Should not make patch request if nominated node is already cleared",
|
||||||
|
currentNominatedNodeName: "",
|
||||||
|
expectedPatchRequests: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
actualPatchRequests := 0
|
||||||
|
var actualPatchData string
|
||||||
|
cs := &clientsetfake.Clientset{}
|
||||||
|
cs.AddReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
||||||
|
actualPatchRequests++
|
||||||
|
patch := action.(clienttesting.PatchAction)
|
||||||
|
actualPatchData = string(patch.GetPatch())
|
||||||
|
// For this test, we don't care about the result of the patched pod, just that we got the expected
|
||||||
|
// patch request, so just returning &v1.Pod{} here is OK because scheduler doesn't use the response.
|
||||||
|
return true, &v1.Pod{}, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
pod := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
|
||||||
|
Status: v1.PodStatus{NominatedNodeName: test.currentNominatedNodeName},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ClearNominatedNodeName(cs, pod); err != nil {
|
||||||
|
t.Fatalf("Error calling removeNominatedNodeName: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if actualPatchRequests != test.expectedPatchRequests {
|
||||||
|
t.Fatalf("Actual patch requests (%d) dos not equal expected patch requests (%d)", actualPatchRequests, test.expectedPatchRequests)
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.expectedPatchRequests > 0 && actualPatchData != test.expectedPatchData {
|
||||||
|
t.Fatalf("Patch data mismatch: Actual was %v, but expected %v", actualPatchData, test.expectedPatchData)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user