Merge pull request #120933 from mengjiao-liu/contextual-logging-scheduler-remaining-part

kube-scheduler: convert the remaining part to use contextual logging
This commit is contained in:
Kubernetes Prow Robot 2023-10-27 10:30:58 +02:00 committed by GitHub
commit fd5c406112
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 109 additions and 92 deletions

View File

@ -26,14 +26,12 @@ structured k8s.io/apiserver/pkg/server/options/encryptionconfig/.*
# The following packages have been migrated to contextual logging.
# Packages matched here do not have to be listed above because
# "contextual" implies "structured".
# TODO next: contextual k8s.io/kubernetes/pkg/scheduler/.*
# A few files involved in startup migrated already to contextual
# We can't enable contextual logcheck until all are migrated
contextual k8s.io/client-go/tools/events/.*
contextual k8s.io/client-go/tools/record/.*
contextual k8s.io/dynamic-resource-allocation/.*
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
contextual k8s.io/kubernetes/pkg/controller/.*
contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,

View File

@ -146,7 +146,7 @@ func (pl *DefaultPreemption) SelectVictimsOnNode(
logger := klog.FromContext(ctx)
var potentialVictims []*framework.PodInfo
removePod := func(rpi *framework.PodInfo) error {
if err := nodeInfo.RemovePod(rpi.Pod); err != nil {
if err := nodeInfo.RemovePod(logger, rpi.Pod); err != nil {
return err
}
status := pl.fh.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo)

View File

@ -353,7 +353,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
// Otherwise we should delete the victim.
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject(pluginName, "preempted")
klog.V(2).InfoS("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(pod), "waitingPod", klog.KObj(victim), "node", c.Name())
logger.V(2).Info("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(pod), "waitingPod", klog.KObj(victim), "node", c.Name())
} else {
if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
condition := &v1.PodCondition{
@ -377,7 +377,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
errCh.SendErrorWithCancel(err, cancel)
return
}
klog.V(2).InfoS("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(pod), "victim", klog.KObj(victim), "node", c.Name())
logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(pod), "victim", klog.KObj(victim), "node", c.Name())
}
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by pod %v on node %v", pod.UID, c.Name())

View File

@ -748,12 +748,12 @@ func podWithRequiredAntiAffinity(p *v1.Pod) bool {
len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0
}
func removeFromSlice(s []*PodInfo, k string) ([]*PodInfo, bool) {
func removeFromSlice(logger klog.Logger, s []*PodInfo, k string) ([]*PodInfo, bool) {
var removed bool
for i := range s {
tmpKey, err := GetPodKey(s[i].Pod)
if err != nil {
klog.ErrorS(err, "Cannot get pod key", "pod", klog.KObj(s[i].Pod))
logger.Error(err, "Cannot get pod key", "pod", klog.KObj(s[i].Pod))
continue
}
if k == tmpKey {
@ -772,20 +772,20 @@ func removeFromSlice(s []*PodInfo, k string) ([]*PodInfo, bool) {
}
// RemovePod subtracts pod information from this NodeInfo.
func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
func (n *NodeInfo) RemovePod(logger klog.Logger, pod *v1.Pod) error {
k, err := GetPodKey(pod)
if err != nil {
return err
}
if podWithAffinity(pod) {
n.PodsWithAffinity, _ = removeFromSlice(n.PodsWithAffinity, k)
n.PodsWithAffinity, _ = removeFromSlice(logger, n.PodsWithAffinity, k)
}
if podWithRequiredAntiAffinity(pod) {
n.PodsWithRequiredAntiAffinity, _ = removeFromSlice(n.PodsWithRequiredAntiAffinity, k)
n.PodsWithRequiredAntiAffinity, _ = removeFromSlice(logger, n.PodsWithRequiredAntiAffinity, k)
}
var removed bool
if n.Pods, removed = removeFromSlice(n.Pods, k); removed {
if n.Pods, removed = removeFromSlice(logger, n.Pods, k); removed {
n.update(pod, -1)
return nil
}

View File

@ -31,6 +31,7 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestNewResource(t *testing.T) {
@ -1083,10 +1084,11 @@ func TestNodeInfoRemovePod(t *testing.T) {
for i, test := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
ni := fakeNodeInfo(pods...)
gen := ni.Generation
err := ni.RemovePod(test.pod)
err := ni.RemovePod(logger, test.pod)
if err != nil {
if test.errExpected {
expectedErrorMsg := fmt.Errorf("no corresponding pod %s in pods of node %s", test.pod.Name, ni.Node().Name)

View File

@ -466,7 +466,7 @@ func (cache *cacheImpl) removePod(logger klog.Logger, pod *v1.Pod) error {
if !ok {
logger.Error(nil, "Node not found when trying to remove pod", "node", klog.KRef("", pod.Spec.NodeName), "podKey", key, "pod", klog.KObj(pod))
} else {
if err := n.info.RemovePod(pod); err != nil {
if err := n.info.RemovePod(logger, pod); err != nil {
return err
}
if len(n.info.Pods) == 0 && n.info.Node() == nil {

View File

@ -104,7 +104,7 @@ type SchedulingQueue interface {
SchedulingCycle() int64
// Pop removes the head of the queue and returns it. It blocks if the
// queue is empty and waits until a new item is added to the queue.
Pop() (*framework.QueuedPodInfo, error)
Pop(logger klog.Logger) (*framework.QueuedPodInfo, error)
// Done must be called for pod returned by Pop. This allows the queue to
// keep track of which pods are currently being processed.
Done(types.UID)
@ -859,7 +859,7 @@ func (p *PriorityQueue) flushUnschedulablePodsLeftover(logger klog.Logger) {
// Pop removes the head of the active queue and returns it. It blocks if the
// activeQ is empty and waits until a new item is added to the queue. It
// increments scheduling cycle when a pod is popped.
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
func (p *PriorityQueue) Pop(logger klog.Logger) (*framework.QueuedPodInfo, error) {
p.lock.Lock()
defer p.lock.Unlock()
for p.activeQ.Len() == 0 {
@ -867,7 +867,7 @@ func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
// When Close() is called, the p.closed is set and the condition is broadcast,
// which causes this loop to continue and return from the Pop().
if p.closed {
klog.V(2).InfoS("Scheduling queue is closed")
logger.V(2).Info("Scheduling queue is closed")
return nil, nil
}
p.cond.Wait()

View File

@ -137,13 +137,13 @@ func TestPriorityQueue_Add(t *testing.T) {
if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" {
t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff)
}
if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
}
if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
if len(q.nominator.nominatedPods["node1"]) != 2 {
@ -168,10 +168,10 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
if err := q.Add(logger, highPriorityPodInfo.Pod); err != nil {
t.Errorf("add failed: %v", err)
}
if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
}
}
@ -185,6 +185,7 @@ func listToValues(l *list.List) []interface{} {
}
func Test_InFlightPods(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
pod := st.MakePod().Name("targetpod").UID("pod1").Obj()
pod2 := st.MakePod().Name("targetpod2").UID("pod2").Obj()
pod3 := st.MakePod().Name("targetpod3").UID("pod3").Obj()
@ -487,9 +488,9 @@ func Test_InFlightPods(t *testing.T) {
isSchedulingQueueHintEnabled: true,
initialPods: []*v1.Pod{pod, pod2},
actions: []action{
{callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, q, pod) }},
{callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, logger, q, pod) }},
{eventHappens: &NodeAdd},
{callback: func(t *testing.T, q *PriorityQueue) { poppedPod2 = popPod(t, q, pod2) }},
{callback: func(t *testing.T, q *PriorityQueue) { poppedPod2 = popPod(t, logger, q, pod2) }},
{eventHappens: &AssignedPodAdd},
{callback: func(t *testing.T, q *PriorityQueue) {
logger, _ := ktesting.NewTestContext(t)
@ -539,7 +540,7 @@ func Test_InFlightPods(t *testing.T) {
isSchedulingQueueHintEnabled: true,
initialPods: []*v1.Pod{pod},
actions: []action{
{callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, q, pod) }},
{callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, logger, q, pod) }},
{callback: func(t *testing.T, q *PriorityQueue) {
logger, _ := ktesting.NewTestContext(t)
// Unschedulable due to PendingPlugins.
@ -551,7 +552,7 @@ func Test_InFlightPods(t *testing.T) {
}},
{eventHappens: &PvAdd}, // Active again.
{callback: func(t *testing.T, q *PriorityQueue) {
poppedPod = popPod(t, q, pod)
poppedPod = popPod(t, logger, q, pod)
if len(poppedPod.UnschedulablePlugins) > 0 {
t.Errorf("QueuedPodInfo from Pop should have empty UnschedulablePlugins, got instead: %+v", poppedPod)
}
@ -597,7 +598,7 @@ func Test_InFlightPods(t *testing.T) {
for _, action := range test.actions {
switch {
case action.podPopped != nil:
popPod(t, q, action.podPopped)
popPod(t, logger, q, action.podPopped)
case action.eventHappens != nil:
q.MoveAllToActiveOrBackoffQueue(logger, *action.eventHappens, nil, nil, nil)
case action.podEnqueued != nil:
@ -679,8 +680,8 @@ func Test_InFlightPods(t *testing.T) {
}
}
func popPod(t *testing.T, q *PriorityQueue, pod *v1.Pod) *framework.QueuedPodInfo {
p, err := q.Pop()
func popPod(t *testing.T, logger klog.Logger, q *PriorityQueue, pod *v1.Pod) *framework.QueuedPodInfo {
p, err := q.Pop(logger)
if err != nil {
t.Fatalf("Pop failed: %v", err)
}
@ -714,7 +715,7 @@ func TestPop(t *testing.T) {
q.Add(logger, pod)
// Simulate failed attempt that makes the pod unschedulable.
poppedPod := popPod(t, q, pod)
poppedPod := popPod(t, logger, q, pod)
// We put register the plugin to PendingPlugins so that it's interpreted as queueImmediately and skip backoff.
poppedPod.PendingPlugins = sets.New("fooPlugin1")
if err := q.AddUnschedulableIfNotPresent(logger, poppedPod, q.SchedulingCycle()); err != nil {
@ -725,7 +726,7 @@ func TestPop(t *testing.T) {
q.MoveAllToActiveOrBackoffQueue(logger, PvAdd, nil, nil, nil)
// Now check result of Pop.
poppedPod = popPod(t, q, pod)
poppedPod = popPod(t, logger, q, pod)
if len(poppedPod.PendingPlugins) > 0 {
t.Errorf("QueuedPodInfo from Pop should have empty PendingPlugins, got instead: %+v", poppedPod)
}
@ -742,7 +743,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
// insert unschedulablePodInfo and pop right after that
// because the scheduling queue records unschedulablePod as in-flight Pod.
q.Add(logger, unschedulablePodInfo.Pod)
if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
@ -763,7 +764,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" {
t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff)
}
if p, err := q.Pop(); err != nil || p.Pod != highPriNominatedPodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != highPriNominatedPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPodInfo.Pod.Name, p.Pod.Name)
}
if len(q.nominator.nominatedPods) != 1 {
@ -796,7 +797,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
// Pop all pods except for the first one
for i := totalNum - 1; i > 0; i-- {
p, _ := q.Pop()
p, _ := q.Pop(logger)
if diff := cmp.Diff(&expectedPods[i], p.Pod); diff != "" {
t.Errorf("Unexpected pod (-want, +got):\n%s", diff)
}
@ -806,7 +807,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
q.MoveAllToActiveOrBackoffQueue(logger, WildCardEvent, nil, nil, nil)
oldCycle := q.SchedulingCycle()
firstPod, _ := q.Pop()
firstPod, _ := q.Pop(logger)
if diff := cmp.Diff(&expectedPods[0], firstPod.Pod); diff != "" {
t.Errorf("Unexpected pod (-want, +got):\n%s", diff)
}
@ -849,7 +850,7 @@ func TestPriorityQueue_Pop(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
}
if len(q.nominator.nominatedPods["node1"]) != 1 {
@ -897,7 +898,7 @@ func TestPriorityQueue_Update(t *testing.T) {
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)); !exists {
t.Errorf("Expected: %v to be added to activeQ.", unschedulablePodInfo.Pod.Name)
}
if p, err := q.Pop(); err != nil || p.Pod != highPriNominatedPodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != highPriNominatedPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
@ -916,7 +917,7 @@ func TestPriorityQueue_Update(t *testing.T) {
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before testing AddUnschedulableIfNotPresent.
q.activeQ.Add(podInfo)
if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
}
err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(medPriorityPodInfo.Pod, "plugin"), q.SchedulingCycle())
@ -940,7 +941,7 @@ func TestPriorityQueue_Update(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error from activeQ.Add: %v", err)
}
if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
}
// updating a pod which is in unschedulable queue, and it is not backing off,
@ -958,7 +959,7 @@ func TestPriorityQueue_Update(t *testing.T) {
// and the pods will be moved into activeQ.
c.Step(q.podInitialBackoffDuration)
q.Update(logger, medPriorityPodInfo.Pod, updatedPod)
if p, err := q.Pop(); err != nil || p.Pod != updatedPod {
if p, err := q.Pop(logger); err != nil || p.Pod != updatedPod {
t.Errorf("Expected: %v after Pop, but got: %v", updatedPod.Name, p.Pod.Name)
}
}
@ -1330,7 +1331,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl))
// add to unsched pod pool
q.activeQ.Add(q.newQueuedPodInfo(test.podInfo.Pod))
if p, err := q.Pop(); err != nil || p.Pod != test.podInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != test.podInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name)
}
err := q.AddUnschedulableIfNotPresent(logger, test.podInfo, q.SchedulingCycle())
@ -1371,12 +1372,12 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID)
q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID)
@ -1392,7 +1393,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
// Construct a Pod, but don't associate its scheduler failure to any plugin
hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1")
q.activeQ.Add(q.newQueuedPodInfo(hpp1))
if p, err := q.Pop(); err != nil || p.Pod != hpp1 {
if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
}
expectInFlightPods(t, q, hpp1.UID)
@ -1405,7 +1406,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
// Construct another Pod, and associate its scheduler failure to plugin "barPlugin".
hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2")
q.activeQ.Add(q.newQueuedPodInfo(hpp2))
if p, err := q.Pop(); err != nil || p.Pod != hpp2 {
if p, err := q.Pop(logger); err != nil || p.Pod != hpp2 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name)
}
expectInFlightPods(t, q, hpp2.UID)
@ -1423,7 +1424,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.Len())
}
// Pop out the medPriorityPodInfo in activeQ.
if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod, p.Pod.Name)
}
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
@ -1441,17 +1442,17 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
q.schedulingCycle++
q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID)
q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID)
q.activeQ.Add(q.newQueuedPodInfo(hpp1))
if p, err := q.Pop(); err != nil || p.Pod != hpp1 {
if p, err := q.Pop(logger); err != nil || p.Pod != hpp1 {
t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
}
unschedulableQueuedPodInfo := q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin")
@ -1553,11 +1554,11 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
q.activeQ.Add(q.newQueuedPodInfo(affinityPod))
if p, err := q.Pop(); err != nil || p.Pod != affinityPod {
if p, err := q.Pop(logger); err != nil || p.Pod != affinityPod {
t.Errorf("Expected: %v after Pop, but got: %v", affinityPod.Name, p.Pod.Name)
}
q.Add(logger, medPriorityPodInfo.Pod)
@ -1596,7 +1597,7 @@ func TestPriorityQueue_NominatedPodsForNode(t *testing.T) {
q.Add(logger, medPriorityPodInfo.Pod)
q.Add(logger, unschedulablePodInfo.Pod)
q.Add(logger, highPriorityPodInfo.Pod)
if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
expectedList := []*framework.PodInfo{medPriorityPodInfo, unschedulablePodInfo}
@ -1681,11 +1682,11 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
q := NewTestQueue(ctx, newDefaultQueueSort())
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
q.activeQ.Add(q.newQueuedPodInfo(highPriorityPodInfo.Pod))
if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != highPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
q.Add(logger, medPriorityPodInfo.Pod)
@ -1748,7 +1749,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" {
t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff)
}
if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
}
// List of nominated pods shouldn't change after popping them from the queue.
@ -1935,14 +1936,15 @@ func TestUnschedulablePodsMap(t *testing.T) {
}
func TestSchedulingQueue_Close(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
pod, err := q.Pop()
pod, err := q.Pop(logger)
if err != nil {
t.Errorf("Expected nil err from Pop() if queue is closed, but got %q", err.Error())
}
@ -1972,7 +1974,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
c.Step(time.Microsecond)
// Simulate a pod being popped by the scheduler, determined unschedulable, and
// then moved back to the active queue.
p1, err := q.Pop()
p1, err := q.Pop(logger)
if err != nil {
t.Errorf("Error while popping the head of the queue: %v", err)
}
@ -1996,7 +1998,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
// Simulation is over. Now let's pop all pods. The pod popped first should be
// the last one we pop here.
for i := 0; i < 5; i++ {
p, err := q.Pop()
p, err := q.Pop(logger)
if err != nil {
t.Errorf("Error while popping pods from the queue: %v", err)
}
@ -2032,7 +2034,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
q.activeQ.Add(q.newQueuedPodInfo(unschedulablePod))
if p, err := q.Pop(); err != nil || p.Pod != unschedulablePod {
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name)
}
// Put in the unschedulable queue
@ -2047,7 +2049,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// Simulate a pod being popped by the scheduler,
// At this time, unschedulable pod should be popped.
p1, err := q.Pop()
p1, err := q.Pop(logger)
if err != nil {
t.Errorf("Error while popping the head of the queue: %v", err)
}
@ -2080,7 +2082,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// At this time, newerPod should be popped
// because it is the oldest tried pod.
p2, err2 := q.Pop()
p2, err2 := q.Pop(logger)
if err2 != nil {
t.Errorf("Error while popping the head of the queue: %v", err2)
}
@ -2103,7 +2105,7 @@ func TestHighPriorityBackoff(t *testing.T) {
q.Add(logger, highPod)
// Simulate a pod being popped by the scheduler, determined unschedulable, and
// then moved back to the active queue.
p, err := q.Pop()
p, err := q.Pop(logger)
if err != nil {
t.Errorf("Error while popping the head of the queue: %v", err)
}
@ -2125,7 +2127,7 @@ func TestHighPriorityBackoff(t *testing.T) {
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil, nil, nil)
p, err = q.Pop()
p, err = q.Pop(logger)
if err != nil {
t.Errorf("Error while popping the head of the queue: %v", err)
}
@ -2170,11 +2172,11 @@ func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) {
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.Add(q.newQueuedPodInfo(highPod))
if p, err := q.Pop(); err != nil || p.Pod != highPod {
if p, err := q.Pop(logger); err != nil || p.Pod != highPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPod.Name, p.Pod.Name)
}
q.activeQ.Add(q.newQueuedPodInfo(midPod))
if p, err := q.Pop(); err != nil || p.Pod != midPod {
if p, err := q.Pop(logger); err != nil || p.Pod != midPod {
t.Errorf("Expected: %v after Pop, but got: %v", midPod.Name, p.Pod.Name)
}
err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPod, "fakePlugin"), q.SchedulingCycle())
@ -2188,10 +2190,10 @@ func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) {
c.Step(DefaultPodMaxInUnschedulablePodsDuration + time.Second)
q.flushUnschedulablePodsLeftover(logger)
if p, err := q.Pop(); err != nil || p.Pod != highPod {
if p, err := q.Pop(logger); err != nil || p.Pod != highPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
}
if p, err := q.Pop(); err != nil || p.Pod != midPod {
if p, err := q.Pop(logger); err != nil || p.Pod != midPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
}
}
@ -2296,7 +2298,7 @@ var (
if err := queue.activeQ.Add(queue.newQueuedPodInfo(pInfo.Pod)); err != nil {
t.Fatalf("Unexpected error during Add: %v", err)
}
p, err := queue.Pop()
p, err := queue.Pop(logger)
if err != nil {
t.Fatalf("Unexpected error during Pop: %v", err)
}
@ -2314,7 +2316,7 @@ var (
if err := queue.activeQ.Add(queue.newQueuedPodInfo(pInfo.Pod)); err != nil {
t.Fatalf("Unexpected error during Add: %v", err)
}
p, err := queue.Pop()
p, err := queue.Pop(logger)
if err != nil {
t.Fatalf("Unexpected error during Pop: %v", err)
}
@ -2770,7 +2772,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
name: "pod is created and scheduled after 2 attempts",
perPodSchedulingMetricsScenario: func(c *testingclock.FakeClock, queue *PriorityQueue, pod *v1.Pod) {
queue.Add(logger, pod)
pInfo, err := queue.Pop()
pInfo, err := queue.Pop(logger)
if err != nil {
t.Fatalf("Failed to pop a pod %v", err)
}
@ -2790,7 +2792,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
name: "pod is created and scheduled after 2 attempts but before the second pop, call update",
perPodSchedulingMetricsScenario: func(c *testingclock.FakeClock, queue *PriorityQueue, pod *v1.Pod) {
queue.Add(logger, pod)
pInfo, err := queue.Pop()
pInfo, err := queue.Pop(logger)
if err != nil {
t.Fatalf("Failed to pop a pod %v", err)
}
@ -2841,7 +2843,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c))
test.perPodSchedulingMetricsScenario(c, queue, pod)
podInfo, err := queue.Pop()
podInfo, err := queue.Pop(logger)
if err != nil {
t.Fatal(err)
}
@ -2978,7 +2980,7 @@ func TestBackOffFlow(t *testing.T) {
t.Run(fmt.Sprintf("step %d", i), func(t *testing.T) {
timestamp := cl.Now()
// Simulate schedule attempt.
podInfo, err := q.Pop()
podInfo, err := q.Pop(logger)
if err != nil {
t.Fatal(err)
}
@ -3073,7 +3075,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
for i, podInfo := range tt.podInfos {
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent() below.
q.activeQ.Add(q.newQueuedPodInfo(podInfo.Pod))
if p, err := q.Pop(); err != nil || p.Pod != podInfo.Pod {
if p, err := q.Pop(logger); err != nil || p.Pod != podInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name)
}
podInfo.UnschedulablePlugins = sets.New("plugin")

View File

@ -66,7 +66,7 @@ const (
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {
logger := klog.FromContext(ctx)
podInfo, err := sched.NextPod()
podInfo, err := sched.NextPod(logger)
if err != nil {
logger.Error(err, "Error while retrieving next pod from scheduling queue")
return

View File

@ -767,7 +767,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
sched := &Scheduler{
Cache: cache,
client: client,
NextPod: func() (*framework.QueuedPodInfo, error) {
NextPod: func(logger klog.Logger) (*framework.QueuedPodInfo, error) {
return &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, item.sendPod)}, nil
},
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
@ -3413,7 +3413,7 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien
client: client,
nodeInfoSnapshot: internalcache.NewEmptySnapshot(),
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
NextPod: func() (*framework.QueuedPodInfo, error) {
NextPod: func(logger klog.Logger) (*framework.QueuedPodInfo, error) {
return &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, clientcache.Pop(queuedPodStore).(*v1.Pod))}, nil
},
SchedulingQueue: schedulingQueue,

View File

@ -72,7 +72,7 @@ type Scheduler struct {
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func() (*framework.QueuedPodInfo, error)
NextPod func(logger klog.Logger) (*framework.QueuedPodInfo, error)
// FailureHandler is called upon a scheduling failure.
FailureHandler FailureHandlerFn

View File

@ -282,8 +282,13 @@ func TestFailureHandler(t *testing.T) {
queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())))
schedulerCache := internalcache.New(ctx, 30*time.Second)
queue.Add(logger, testPod)
queue.Pop()
if err := queue.Add(logger, testPod); err != nil {
t.Fatalf("Add failed: %v", err)
}
if _, err := queue.Pop(logger); err != nil {
t.Fatalf("Pop failed: %v", err)
}
if tt.podUpdatedDuringScheduling {
podInformer.Informer().GetStore().Update(testPodUpdated)
@ -304,7 +309,7 @@ func TestFailureHandler(t *testing.T) {
var got *v1.Pod
if tt.podUpdatedDuringScheduling {
head, e := queue.Pop()
head, e := queue.Pop(logger)
if e != nil {
t.Fatalf("Cannot pop pod from the activeQ: %v", e)
}

View File

@ -24,6 +24,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
@ -191,10 +192,13 @@ func (f *FakeExtender) ProcessPreemption(
nodeNameToVictimsCopy[k] = v
}
// If Extender.ProcessPreemption ever gets extended with a context parameter, then the logger should be retrieved from that.
// Now, in order not to modify the Extender interface, we get the logger from klog.TODO()
logger := klog.TODO()
for nodeName, victims := range nodeNameToVictimsCopy {
// Try to do preemption on extender side.
nodeInfo, _ := nodeInfos.Get(nodeName)
extenderVictimPods, extenderPDBViolations, fits, err := f.selectVictimsOnNodeByExtender(pod, nodeInfo.Node())
extenderVictimPods, extenderPDBViolations, fits, err := f.selectVictimsOnNodeByExtender(logger, pod, nodeInfo.Node())
if err != nil {
return nil, err
}
@ -216,7 +220,7 @@ func (f *FakeExtender) ProcessPreemption(
// 1. More victim pods (if any) amended by preemption phase of extender.
// 2. Number of violating victim (used to calculate PDB).
// 3. Fits or not after preemption phase on extender's side.
func (f *FakeExtender) selectVictimsOnNodeByExtender(pod *v1.Pod, node *v1.Node) ([]*v1.Pod, int, bool, error) {
func (f *FakeExtender) selectVictimsOnNodeByExtender(logger klog.Logger, pod *v1.Pod, node *v1.Node) ([]*v1.Pod, int, bool, error) {
// If a extender support preemption but have no cached node info, let's run filter to make sure
// default scheduler's decision still stand with given pod and node.
if !f.NodeCacheCapable {
@ -236,8 +240,8 @@ func (f *FakeExtender) selectVictimsOnNodeByExtender(pod *v1.Pod, node *v1.Node)
var potentialVictims []*v1.Pod
removePod := func(rp *v1.Pod) {
nodeInfoCopy.RemovePod(rp)
removePod := func(rp *v1.Pod) error {
return nodeInfoCopy.RemovePod(logger, rp)
}
addPod := func(ap *v1.Pod) {
nodeInfoCopy.AddPod(ap)
@ -248,7 +252,9 @@ func (f *FakeExtender) selectVictimsOnNodeByExtender(pod *v1.Pod, node *v1.Node)
for _, p := range nodeInfoCopy.Pods {
if corev1helpers.PodPriority(p.Pod) < podPriority {
potentialVictims = append(potentialVictims, p.Pod)
removePod(p.Pod)
if err := removePod(p.Pod); err != nil {
return nil, 0, false, err
}
}
}
sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i], potentialVictims[j]) })
@ -275,7 +281,9 @@ func (f *FakeExtender) selectVictimsOnNodeByExtender(pod *v1.Pod, node *v1.Node)
addPod(p)
status := f.runPredicate(pod, nodeInfoCopy.Node())
if !status.IsSuccess() {
removePod(p)
if err := removePod(p); err != nil {
return false
}
victims = append(victims, p)
}
return status.IsSuccess()

View File

@ -1521,8 +1521,8 @@ func initTestPreferNominatedNode(t *testing.T, nsPrefix string, opts ...schedule
testutils.SyncSchedulerInformerFactory(testCtx)
// wraps the NextPod() method to make it appear the preemption has been done already and the nominated node has been set.
f := testCtx.Scheduler.NextPod
testCtx.Scheduler.NextPod = func() (*framework.QueuedPodInfo, error) {
podInfo, _ := f()
testCtx.Scheduler.NextPod = func(logger klog.Logger) (*framework.QueuedPodInfo, error) {
podInfo, _ := f(klog.FromContext(testCtx.Ctx))
// Scheduler.Next() may return nil when scheduler is shutting down.
if podInfo != nil {
podInfo.Pod.Status.NominatedNodeName = "node-1"

View File

@ -1145,10 +1145,11 @@ func NextPodOrDie(t *testing.T, testCtx *TestContext) *schedulerframework.Queued
t.Helper()
var podInfo *schedulerframework.QueuedPodInfo
logger := klog.FromContext(testCtx.Ctx)
// NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on
// default go testing timeout (10m) to abort.
if err := timeout(testCtx.Ctx, time.Second*5, func() {
podInfo, _ = testCtx.Scheduler.NextPod()
podInfo, _ = testCtx.Scheduler.NextPod(logger)
}); err != nil {
t.Fatalf("Timed out waiting for the Pod to be popped: %v", err)
}
@ -1160,10 +1161,11 @@ func NextPod(t *testing.T, testCtx *TestContext) *schedulerframework.QueuedPodIn
t.Helper()
var podInfo *schedulerframework.QueuedPodInfo
logger := klog.FromContext(testCtx.Ctx)
// NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on
// default go testing timeout (10m) to abort.
if err := timeout(testCtx.Ctx, time.Second*5, func() {
podInfo, _ = testCtx.Scheduler.NextPod()
podInfo, _ = testCtx.Scheduler.NextPod(logger)
}); err != nil {
return nil
}