Migrated pkg/scheduler/framework/preemption to use contextual logging

This commit is contained in:
Mengjiao Liu 2023-03-22 12:02:57 +08:00
parent 9c6414cdfe
commit eeb1399383
2 changed files with 30 additions and 24 deletions

View File

@ -36,6 +36,7 @@ import (
clientsetfake "k8s.io/client-go/kubernetes/fake" clientsetfake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing" clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
"k8s.io/klog/v2/ktesting"
kubeschedulerconfigv1beta2 "k8s.io/kube-scheduler/config/v1beta2" kubeschedulerconfigv1beta2 "k8s.io/kube-scheduler/config/v1beta2"
extenderv1 "k8s.io/kube-scheduler/extender/v1" extenderv1 "k8s.io/kube-scheduler/extender/v1"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -1337,7 +1338,8 @@ func TestSelectBestCandidate(t *testing.T) {
cs := clientsetfake.NewSimpleClientset(objs...) cs := clientsetfake.NewSimpleClientset(objs...)
informerFactory := informers.NewSharedInformerFactory(cs, 0) informerFactory := informers.NewSharedInformerFactory(cs, 0)
snapshot := internalcache.NewSnapshot(tt.pods, nodes) snapshot := internalcache.NewSnapshot(tt.pods, nodes)
ctx, cancel := context.WithCancel(context.Background()) logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
[]st.RegisterPluginFunc{ []st.RegisterPluginFunc{
@ -1380,7 +1382,7 @@ func TestSelectBestCandidate(t *testing.T) {
} }
offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos))) offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos)))
candidates, _, _ := pe.DryRunPreemption(ctx, tt.pod, nodeInfos, nil, offset, numCandidates) candidates, _, _ := pe.DryRunPreemption(ctx, tt.pod, nodeInfos, nil, offset, numCandidates)
s := pe.SelectCandidate(candidates) s := pe.SelectCandidate(logger, candidates)
if s == nil || len(s.Name()) == 0 { if s == nil || len(s.Name()) == 0 {
return return
} }

View File

@ -150,6 +150,8 @@ type Evaluator struct {
// - <non-nil PostFilterResult, Success>. It's the regular happy path // - <non-nil PostFilterResult, Success>. It's the regular happy path
// and the non-empty nominatedNodeName will be applied to the preemptor pod. // and the non-empty nominatedNodeName will be applied to the preemptor pod.
func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
logger := klog.FromContext(ctx)
// 0) Fetch the latest version of <pod>. // 0) Fetch the latest version of <pod>.
// It's safe to directly fetch pod here. Because the informer cache has already been // It's safe to directly fetch pod here. Because the informer cache has already been
// initialized when creating the Scheduler obj. // initialized when creating the Scheduler obj.
@ -157,13 +159,13 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
podNamespace, podName := pod.Namespace, pod.Name podNamespace, podName := pod.Namespace, pod.Name
pod, err := ev.PodLister.Pods(pod.Namespace).Get(pod.Name) pod, err := ev.PodLister.Pods(pod.Namespace).Get(pod.Name)
if err != nil { if err != nil {
klog.ErrorS(err, "Getting the updated preemptor pod object", "pod", klog.KRef(podNamespace, podName)) logger.Error(err, "Could not get the updated preemptor pod object", "pod", klog.KRef(podNamespace, podName))
return nil, framework.AsStatus(err) return nil, framework.AsStatus(err)
} }
// 1) Ensure the preemptor is eligible to preempt other pods. // 1) Ensure the preemptor is eligible to preempt other pods.
if ok, msg := ev.PodEligibleToPreemptOthers(pod, m[pod.Status.NominatedNodeName]); !ok { if ok, msg := ev.PodEligibleToPreemptOthers(pod, m[pod.Status.NominatedNodeName]); !ok {
klog.V(5).InfoS("Pod is not eligible for preemption", "pod", klog.KObj(pod), "reason", msg) logger.V(5).Info("Pod is not eligible for preemption", "pod", klog.KObj(pod), "reason", msg)
return nil, framework.NewStatus(framework.Unschedulable, msg) return nil, framework.NewStatus(framework.Unschedulable, msg)
} }
@ -188,13 +190,13 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
} }
// 3) Interact with registered Extenders to filter out some candidates if needed. // 3) Interact with registered Extenders to filter out some candidates if needed.
candidates, status := ev.callExtenders(pod, candidates) candidates, status := ev.callExtenders(logger, pod, candidates)
if !status.IsSuccess() { if !status.IsSuccess() {
return nil, status return nil, status
} }
// 4) Find the best candidate. // 4) Find the best candidate.
bestCandidate := ev.SelectCandidate(candidates) bestCandidate := ev.SelectCandidate(logger, candidates)
if bestCandidate == nil || len(bestCandidate.Name()) == 0 { if bestCandidate == nil || len(bestCandidate.Name()) == 0 {
return nil, framework.NewStatus(framework.Unschedulable, "no candidate node for preemption") return nil, framework.NewStatus(framework.Unschedulable, "no candidate node for preemption")
} }
@ -217,12 +219,13 @@ func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framewor
if len(allNodes) == 0 { if len(allNodes) == 0 {
return nil, nil, errors.New("no nodes available") return nil, nil, errors.New("no nodes available")
} }
logger := klog.FromContext(ctx)
potentialNodes, unschedulableNodeStatus := nodesWherePreemptionMightHelp(allNodes, m) potentialNodes, unschedulableNodeStatus := nodesWherePreemptionMightHelp(allNodes, m)
if len(potentialNodes) == 0 { if len(potentialNodes) == 0 {
klog.V(3).InfoS("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod)) logger.V(3).Info("Preemption will not help schedule pod on any node", "pod", klog.KObj(pod))
// In this case, we should clean-up any existing nominated node name of the pod. // In this case, we should clean-up any existing nominated node name of the pod.
if err := util.ClearNominatedNodeName(ctx, ev.Handler.ClientSet(), pod); err != nil { if err := util.ClearNominatedNodeName(ctx, ev.Handler.ClientSet(), pod); err != nil {
klog.ErrorS(err, "Cannot clear 'NominatedNodeName' field of pod", "pod", klog.KObj(pod)) logger.Error(err, "Could not clear the nominatedNodeName field of pod", "pod", klog.KObj(pod))
// We do not return as this error is not critical. // We do not return as this error is not critical.
} }
return nil, unschedulableNodeStatus, nil return nil, unschedulableNodeStatus, nil
@ -234,12 +237,12 @@ func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framewor
} }
offset, numCandidates := ev.GetOffsetAndNumCandidates(int32(len(potentialNodes))) offset, numCandidates := ev.GetOffsetAndNumCandidates(int32(len(potentialNodes)))
if klogV := klog.V(5); klogV.Enabled() { if loggerV := logger.V(5); logger.Enabled() {
var sample []string var sample []string
for i := offset; i < offset+10 && i < int32(len(potentialNodes)); i++ { for i := offset; i < offset+10 && i < int32(len(potentialNodes)); i++ {
sample = append(sample, potentialNodes[i].Node().Name) sample = append(sample, potentialNodes[i].Node().Name)
} }
klogV.InfoS("Selecting candidates from a pool of nodes", "potentialNodesCount", len(potentialNodes), "offset", offset, "sampleLength", len(sample), "sample", sample, "candidates", numCandidates) loggerV.Info("Selected candidates from a pool of nodes", "potentialNodesCount", len(potentialNodes), "offset", offset, "sampleLength", len(sample), "sample", sample, "candidates", numCandidates)
} }
candidates, nodeStatuses, err := ev.DryRunPreemption(ctx, pod, potentialNodes, pdbs, offset, numCandidates) candidates, nodeStatuses, err := ev.DryRunPreemption(ctx, pod, potentialNodes, pdbs, offset, numCandidates)
for node, nodeStatus := range unschedulableNodeStatus { for node, nodeStatus := range unschedulableNodeStatus {
@ -252,7 +255,7 @@ func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framewor
// We will only check <candidates> with extenders that support preemption. // We will only check <candidates> with extenders that support preemption.
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles. // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
func (ev *Evaluator) callExtenders(pod *v1.Pod, candidates []Candidate) ([]Candidate, *framework.Status) { func (ev *Evaluator) callExtenders(logger klog.Logger, pod *v1.Pod, candidates []Candidate) ([]Candidate, *framework.Status) {
extenders := ev.Handler.Extenders() extenders := ev.Handler.Extenders()
nodeLister := ev.Handler.SnapshotSharedLister().NodeInfos() nodeLister := ev.Handler.SnapshotSharedLister().NodeInfos()
if len(extenders) == 0 { if len(extenders) == 0 {
@ -272,8 +275,8 @@ func (ev *Evaluator) callExtenders(pod *v1.Pod, candidates []Candidate) ([]Candi
nodeNameToVictims, err := extender.ProcessPreemption(pod, victimsMap, nodeLister) nodeNameToVictims, err := extender.ProcessPreemption(pod, victimsMap, nodeLister)
if err != nil { if err != nil {
if extender.IsIgnorable() { if extender.IsIgnorable() {
klog.InfoS("Skipping extender as it returned error and has ignorable flag set", logger.Info("Skipped extender as it returned error and has ignorable flag set",
"extender", extender, "err", err) "extender", extender.Name(), "err", err)
continue continue
} }
return nil, framework.AsStatus(err) return nil, framework.AsStatus(err)
@ -283,7 +286,7 @@ func (ev *Evaluator) callExtenders(pod *v1.Pod, candidates []Candidate) ([]Candi
if victims == nil || len(victims.Pods) == 0 { if victims == nil || len(victims.Pods) == 0 {
if extender.IsIgnorable() { if extender.IsIgnorable() {
delete(nodeNameToVictims, nodeName) delete(nodeNameToVictims, nodeName)
klog.InfoS("Ignoring node without victims", "node", klog.KRef("", nodeName)) logger.Info("Ignored node for which the extender didn't report victims", "node", klog.KRef("", nodeName), "extender", extender.Name())
continue continue
} }
return nil, framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeName)) return nil, framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeName))
@ -312,7 +315,7 @@ func (ev *Evaluator) callExtenders(pod *v1.Pod, candidates []Candidate) ([]Candi
// SelectCandidate chooses the best-fit candidate from given <candidates> and return it. // SelectCandidate chooses the best-fit candidate from given <candidates> and return it.
// NOTE: This method is exported for easier testing in default preemption. // NOTE: This method is exported for easier testing in default preemption.
func (ev *Evaluator) SelectCandidate(candidates []Candidate) Candidate { func (ev *Evaluator) SelectCandidate(logger klog.Logger, candidates []Candidate) Candidate {
if len(candidates) == 0 { if len(candidates) == 0 {
return nil return nil
} }
@ -321,7 +324,7 @@ func (ev *Evaluator) SelectCandidate(candidates []Candidate) Candidate {
} }
victimsMap := ev.CandidatesToVictimsMap(candidates) victimsMap := ev.CandidatesToVictimsMap(candidates)
candidateNode := pickOneNodeForPreemption(victimsMap) candidateNode := pickOneNodeForPreemption(logger, victimsMap)
// Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree // Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree
// preemption plugins that exercise different candidates on the same nominated node. // preemption plugins that exercise different candidates on the same nominated node.
@ -333,7 +336,7 @@ func (ev *Evaluator) SelectCandidate(candidates []Candidate) Candidate {
} }
// We shouldn't reach here. // We shouldn't reach here.
klog.ErrorS(errors.New("no candidate selected"), "Should not reach here", "candidates", candidates) logger.Error(errors.New("no candidate selected"), "Should not reach here", "candidates", candidates)
// To not break the whole flow, return the first candidate. // To not break the whole flow, return the first candidate.
return candidates[0] return candidates[0]
} }
@ -348,6 +351,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
logger := klog.FromContext(ctx)
errCh := parallelize.NewErrorChannel() errCh := parallelize.NewErrorChannel()
preemptPod := func(index int) { preemptPod := func(index int) {
victim := c.Victims().Pods[index] victim := c.Victims().Pods[index]
@ -367,13 +371,13 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
) )
if _, err := cs.CoreV1().Pods(victim.Namespace).ApplyStatus(ctx, victimPodApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil { if _, err := cs.CoreV1().Pods(victim.Namespace).ApplyStatus(ctx, victimPodApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil {
klog.ErrorS(err, "Preparing pod preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
errCh.SendErrorWithCancel(err, cancel) errCh.SendErrorWithCancel(err, cancel)
return return
} }
} }
if err := util.DeletePod(ctx, cs, victim); err != nil { if err := util.DeletePod(ctx, cs, victim); err != nil {
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) logger.Error(err, "Preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
errCh.SendErrorWithCancel(err, cancel) errCh.SendErrorWithCancel(err, cancel)
return return
} }
@ -392,9 +396,9 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
// this node. So, we should remove their nomination. Removing their // this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It // nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them. // lets scheduler find another place for them.
nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name()) nominatedPods := getLowerPriorityNominatedPods(logger, fh, pod, c.Name())
if err := util.ClearNominatedNodeName(ctx, cs, nominatedPods...); err != nil { if err := util.ClearNominatedNodeName(ctx, cs, nominatedPods...); err != nil {
klog.ErrorS(err, "Cannot clear 'NominatedNodeName' field") logger.Error(err, "Cannot clear 'NominatedNodeName' field")
// We do not return as this error is not critical. // We do not return as this error is not critical.
} }
@ -437,7 +441,7 @@ func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister)
// 6. If there are still ties, the first such node is picked (sort of randomly). // 6. If there are still ties, the first such node is picked (sort of randomly).
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory // The 'minNodes1' and 'minNodes2' are being reused here to save the memory
// allocation and garbage collection time. // allocation and garbage collection time.
func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) string { func pickOneNodeForPreemption(logger klog.Logger, nodesToVictims map[string]*extenderv1.Victims) string {
if len(nodesToVictims) == 0 { if len(nodesToVictims) == 0 {
return "" return ""
} }
@ -477,7 +481,7 @@ func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) str
// Get earliest start time of all pods on the current node. // Get earliest start time of all pods on the current node.
earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node]) earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
if earliestStartTimeOnNode == nil { if earliestStartTimeOnNode == nil {
klog.ErrorS(errors.New("earliestStartTime is nil for node"), "Should not reach here", "node", node) logger.Error(errors.New("earliestStartTime is nil for node"), "Should not reach here", "node", node)
return int64(math.MinInt64) return int64(math.MinInt64)
} }
// The bigger the earliestStartTimeOnNode, the higher the score. // The bigger the earliestStartTimeOnNode, the higher the score.
@ -530,7 +534,7 @@ func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) str
// manipulation of NodeInfo and PreFilter state per nominated pod. It may not be // manipulation of NodeInfo and PreFilter state per nominated pod. It may not be
// worth the complexity, especially because we generally expect to have a very // worth the complexity, especially because we generally expect to have a very
// small number of nominated pods per node. // small number of nominated pods per node.
func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod { func getLowerPriorityNominatedPods(logger klog.Logger, pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod {
podInfos := pn.NominatedPodsForNode(nodeName) podInfos := pn.NominatedPodsForNode(nodeName)
if len(podInfos) == 0 { if len(podInfos) == 0 {