change the QueueingHintFn to pass a logger

This commit is contained in:
carlory 2023-07-12 10:24:35 +08:00
parent be13c6a884
commit 0599b3caa0
8 changed files with 46 additions and 53 deletions

View File

@ -217,20 +217,10 @@ type dynamicResources struct {
claimLister resourcev1alpha2listers.ResourceClaimLister
classLister resourcev1alpha2listers.ResourceClassLister
podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister
// logger is only meant to be used by background activities which don't
// have some other logger in their parent callstack.
logger klog.Logger
}
// New initializes a new plugin and returns it.
func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
// TODO: the runtime should set up logging for each plugin, including
// adding a name for each one (same as in kube-controller-manager).
return NewWithLogger(klog.TODO(), plArgs, fh, fts)
}
func NewWithLogger(logger klog.Logger, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
if !fts.EnableDynamicResourceAllocation {
// Disabled, won't do anything.
return &dynamicResources{}, nil
@ -243,7 +233,6 @@ func NewWithLogger(logger klog.Logger, plArgs runtime.Object, fh framework.Handl
claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(),
classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(),
podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(),
logger: logger,
}, nil
}
@ -294,7 +283,7 @@ func (pl *dynamicResources) PreEnqueue(ctx context.Context, pod *v1.Pod) (status
// an informer. It checks whether that change made a previously unschedulable
// pod schedulable. It errs on the side of letting a pod scheduling attempt
// happen.
func (pl *dynamicResources) isSchedulableAfterClaimChange(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
if newObj == nil {
// Deletes don't make a pod schedulable.
return framework.QueueSkip
@ -303,7 +292,7 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(pod *v1.Pod, oldObj, n
_, modifiedClaim, err := schedutil.As[*resourcev1alpha2.ResourceClaim](nil, newObj)
if err != nil {
// Shouldn't happen.
pl.logger.Error(err, "unexpected new object in isSchedulableAfterClaimChange")
logger.Error(err, "unexpected new object in isSchedulableAfterClaimChange")
return framework.QueueAfterBackoff
}
@ -316,18 +305,18 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(pod *v1.Pod, oldObj, n
// This is not an unexpected error: we know that
// foreachPodResourceClaim only returns errors for "not
// schedulable".
pl.logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "reason", err.Error())
logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "reason", err.Error())
return framework.QueueSkip
}
if !usesClaim {
// This was not the claim the pod was waiting for.
pl.logger.V(6).Info("unrelated claim got modified", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
logger.V(6).Info("unrelated claim got modified", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
return framework.QueueSkip
}
if oldObj == nil {
pl.logger.V(4).Info("claim for pod got created", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
logger.V(4).Info("claim for pod got created", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
return framework.QueueImmediately
}
@ -338,20 +327,20 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(pod *v1.Pod, oldObj, n
originalClaim, ok := oldObj.(*resourcev1alpha2.ResourceClaim)
if !ok {
// Shouldn't happen.
pl.logger.Error(nil, "unexpected old object in isSchedulableAfterClaimAddOrUpdate", "obj", oldObj)
logger.Error(nil, "unexpected old object in isSchedulableAfterClaimAddOrUpdate", "obj", oldObj)
return framework.QueueAfterBackoff
}
if apiequality.Semantic.DeepEqual(&originalClaim.Status, &modifiedClaim.Status) {
if loggerV := pl.logger.V(7); loggerV.Enabled() {
if loggerV := logger.V(7); loggerV.Enabled() {
// Log more information.
loggerV.Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "diff", cmp.Diff(originalClaim, modifiedClaim))
} else {
pl.logger.V(6).Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
logger.V(6).Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
}
return framework.QueueSkip
}
pl.logger.V(4).Info("status of claim for pod got updated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
logger.V(4).Info("status of claim for pod got updated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
return framework.QueueImmediately
}
@ -360,24 +349,24 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(pod *v1.Pod, oldObj, n
// change made a previously unschedulable pod schedulable (updated) or a new
// attempt is needed to re-create the object (deleted). It errs on the side of
// letting a pod scheduling attempt happen.
func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
// Deleted? That can happen because we ourselves delete the PodSchedulingContext while
// working on the pod. This can be ignored.
if oldObj != nil && newObj == nil {
pl.logger.V(4).Info("PodSchedulingContext got deleted")
logger.V(4).Info("PodSchedulingContext got deleted")
return framework.QueueSkip
}
oldPodScheduling, newPodScheduling, err := schedutil.As[*resourcev1alpha2.PodSchedulingContext](oldObj, newObj)
if err != nil {
// Shouldn't happen.
pl.logger.Error(nil, "isSchedulableAfterPodSchedulingChange")
logger.Error(nil, "isSchedulableAfterPodSchedulingChange")
return framework.QueueAfterBackoff
}
podScheduling := newPodScheduling // Never nil because deletes are handled above.
if podScheduling.Name != pod.Name || podScheduling.Namespace != pod.Namespace {
pl.logger.V(7).Info("PodSchedulingContext for unrelated pod got modified", "pod", klog.KObj(pod), "podScheduling", klog.KObj(podScheduling))
logger.V(7).Info("PodSchedulingContext for unrelated pod got modified", "pod", klog.KObj(pod), "podScheduling", klog.KObj(podScheduling))
return framework.QueueSkip
}
@ -397,7 +386,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(pod *v1
// This is not an unexpected error: we know that
// foreachPodResourceClaim only returns errors for "not
// schedulable".
pl.logger.V(4).Info("pod is not schedulable, keep waiting", "pod", klog.KObj(pod), "reason", err.Error())
logger.V(4).Info("pod is not schedulable, keep waiting", "pod", klog.KObj(pod), "reason", err.Error())
return framework.QueueSkip
}
@ -407,10 +396,10 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(pod *v1
// potential nodes list. But pod scheduling attempts are
// expensive and doing them too often causes the pod to enter
// backoff. Let's wait instead for all drivers to reply.
if loggerV := pl.logger.V(6); loggerV.Enabled() {
if loggerV := logger.V(6); loggerV.Enabled() {
loggerV.Info("PodSchedulingContext with missing resource claim information, keep waiting", "pod", klog.KObj(pod), "podSchedulingDiff", cmp.Diff(oldPodScheduling, podScheduling))
} else {
pl.logger.V(5).Info("PodSchedulingContext with missing resource claim information, keep waiting", "pod", klog.KObj(pod))
logger.V(5).Info("PodSchedulingContext with missing resource claim information, keep waiting", "pod", klog.KObj(pod))
}
return framework.QueueSkip
}
@ -418,7 +407,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(pod *v1
if oldPodScheduling == nil /* create */ ||
len(oldPodScheduling.Status.ResourceClaims) < len(podScheduling.Status.ResourceClaims) /* new information and not incomplete (checked above) */ {
// This definitely is new information for the scheduler. Try again immediately.
pl.logger.V(4).Info("PodSchedulingContext for pod has all required information, schedule immediately", "pod", klog.KObj(pod))
logger.V(4).Info("PodSchedulingContext for pod has all required information, schedule immediately", "pod", klog.KObj(pod))
return framework.QueueImmediately
}
@ -443,7 +432,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(pod *v1
if podScheduling.Spec.SelectedNode != "" {
for _, claimStatus := range podScheduling.Status.ResourceClaims {
if sliceContains(claimStatus.UnsuitableNodes, podScheduling.Spec.SelectedNode) {
pl.logger.V(5).Info("PodSchedulingContext has unsuitable selected node, schedule immediately", "pod", klog.KObj(pod), "selectedNode", podScheduling.Spec.SelectedNode, "podResourceName", claimStatus.Name)
logger.V(5).Info("PodSchedulingContext has unsuitable selected node, schedule immediately", "pod", klog.KObj(pod), "selectedNode", podScheduling.Spec.SelectedNode, "podResourceName", claimStatus.Name)
return framework.QueueImmediately
}
}
@ -453,7 +442,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(pod *v1
if oldPodScheduling != nil &&
!apiequality.Semantic.DeepEqual(&oldPodScheduling.Spec, &podScheduling.Spec) &&
apiequality.Semantic.DeepEqual(&oldPodScheduling.Status, &podScheduling.Status) {
pl.logger.V(5).Info("PodSchedulingContext has only the scheduler spec changes, ignore the update", "pod", klog.KObj(pod))
logger.V(5).Info("PodSchedulingContext has only the scheduler spec changes, ignore the update", "pod", klog.KObj(pod))
return framework.QueueSkip
}
@ -462,10 +451,10 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(pod *v1
// to handle it and thus return QueueAfterBackoff. This will cause the
// scheduler to treat the event as if no event hint callback had been provided.
// Developers who want to investigate this can enable a diff at log level 6.
if loggerV := pl.logger.V(6); loggerV.Enabled() {
if loggerV := logger.V(6); loggerV.Enabled() {
loggerV.Info("PodSchedulingContext for pod with unknown changes, maybe schedule", "pod", klog.KObj(pod), "podSchedulingDiff", cmp.Diff(oldPodScheduling, podScheduling))
} else {
pl.logger.V(5).Info("PodSchedulingContext for pod with unknown changes, maybe schedule", "pod", klog.KObj(pod))
logger.V(5).Info("PodSchedulingContext for pod with unknown changes, maybe schedule", "pod", klog.KObj(pod))
}
return framework.QueueAfterBackoff

View File

@ -762,7 +762,7 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl
t.Helper()
tc := &testContext{}
logger, ctx := ktesting.NewTestContext(t)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
tc.ctx = ctx
@ -782,7 +782,7 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl
t.Fatal(err)
}
pl, err := NewWithLogger(logger, nil, fh, feature.Features{EnableDynamicResourceAllocation: true})
pl, err := New(nil, fh, feature.Features{EnableDynamicResourceAllocation: true})
if err != nil {
t.Fatal(err)
}
@ -885,7 +885,7 @@ func createReactor(tracker cgotesting.ObjectTracker) func(action cgotesting.Acti
}
}
func TestClaimChange(t *testing.T) {
func Test_isSchedulableAfterClaimChange(t *testing.T) {
testcases := map[string]struct {
pod *v1.Pod
claims []*resourcev1alpha2.ResourceClaim
@ -960,6 +960,7 @@ func TestClaimChange(t *testing.T) {
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
testCtx := setup(t, nil, tc.claims, nil, nil)
if claim, ok := tc.newObj.(*resourcev1alpha2.ResourceClaim); ok {
// Update the informer because the lister gets called and must have the claim.
@ -970,13 +971,13 @@ func TestClaimChange(t *testing.T) {
require.NoError(t, store.Update(claim))
}
}
actualHint := testCtx.p.isSchedulableAfterClaimChange(tc.pod, tc.oldObj, tc.newObj)
actualHint := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, tc.oldObj, tc.newObj)
require.Equal(t, tc.expectedHint, actualHint)
})
}
}
func TestPodSchedulingContextChange(t *testing.T) {
func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
testcases := map[string]struct {
pod *v1.Pod
schedulings []*resourcev1alpha2.PodSchedulingContext
@ -1090,8 +1091,9 @@ func TestPodSchedulingContextChange(t *testing.T) {
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
logger, _ := ktesting.NewTestContext(t)
testCtx := setup(t, nil, tc.claims, nil, tc.schedulings)
actualHint := testCtx.p.isSchedulableAfterPodSchedulingContextChange(tc.pod, tc.oldObj, tc.newObj)
actualHint := testCtx.p.isSchedulableAfterPodSchedulingContextChange(logger, tc.pod, tc.oldObj, tc.newObj)
require.Equal(t, tc.expectedHint, actualHint)
})
}

View File

@ -97,7 +97,7 @@ type ClusterEventWithHint struct {
// - For example, the given event is "Node deleted", the `oldObj` will be that deleted Node.
// - `oldObj` is nil if the event is add event.
// - `newObj` is nil if the event is delete event.
type QueueingHintFn func(pod *v1.Pod, oldObj, newObj interface{}) QueueingHint
type QueueingHintFn func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) QueueingHint
type QueueingHint int

View File

@ -389,7 +389,7 @@ func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework
continue
}
switch h := hintfn.QueueingHintFn(pod, oldObj, newObj); h {
switch h := hintfn.QueueingHintFn(logger, pod, oldObj, newObj); h {
case framework.QueueSkip:
continue
case framework.QueueImmediately:

View File

@ -83,13 +83,13 @@ var (
cmpopts.IgnoreFields(nominator{}, "podLister", "lock"),
}
queueHintReturnQueueAfterBackoff = func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
queueHintReturnQueueAfterBackoff = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
return framework.QueueAfterBackoff
}
queueHintReturnQueueImmediately = func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
queueHintReturnQueueImmediately = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
return framework.QueueImmediately
}
queueHintReturnQueueSkip = func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
queueHintReturnQueueSkip = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
return framework.QueueSkip
}
)
@ -2364,15 +2364,15 @@ func mustNewPodInfo(pod *v1.Pod) *framework.PodInfo {
// Test_isPodWorthRequeuing tests isPodWorthRequeuing function.
func Test_isPodWorthRequeuing(t *testing.T) {
count := 0
queueHintReturnQueueImmediately := func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
queueHintReturnQueueImmediately := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
count++
return framework.QueueImmediately
}
queueHintReturnQueueSkip := func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
queueHintReturnQueueSkip := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
count++
return framework.QueueSkip
}
queueHintReturnQueueAfterBackoff := func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
queueHintReturnQueueAfterBackoff := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
count++
return framework.QueueAfterBackoff
}

View File

@ -361,7 +361,7 @@ func New(ctx context.Context,
// defaultQueueingHintFn is the default queueing hint function.
// It always returns QueueAfterBackoff as the queueing hint.
var defaultQueueingHintFn = func(_ *v1.Pod, _, _ interface{}) framework.QueueingHint {
var defaultQueueingHintFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) framework.QueueingHint {
return framework.QueueAfterBackoff
}

View File

@ -764,6 +764,7 @@ func Test_buildQueueingHintMap(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
registry := frameworkruntime.Registry{}
cfgPls := &schedulerapi.Plugins{}
plugins := append(tt.plugins, &fakebindPlugin{}, &fakeQueueSortPlugin{})
@ -808,8 +809,8 @@ func Test_buildQueueingHintMap(t *testing.T) {
t.Errorf("got plugin name %v, want %v", fn.PluginName, wantfns[i].PluginName)
continue
}
if fn.QueueingHintFn(nil, nil, nil) != wantfns[i].QueueingHintFn(nil, nil, nil) {
t.Errorf("got queueing hint function (%v) returning %v, expect it to return %v", fn.PluginName, fn.QueueingHintFn(nil, nil, nil), wantfns[i].QueueingHintFn(nil, nil, nil))
if fn.QueueingHintFn(logger, nil, nil, nil) != wantfns[i].QueueingHintFn(logger, nil, nil, nil) {
t.Errorf("got queueing hint function (%v) returning %v, expect it to return %v", fn.PluginName, fn.QueueingHintFn(logger, nil, nil, nil), wantfns[i].QueueingHintFn(logger, nil, nil, nil))
continue
}
}
@ -1033,7 +1034,7 @@ var hintFromFakeNode = framework.QueueingHint(100)
type fakeNodePlugin struct{}
var fakeNodePluginQueueingFn = func(_ *v1.Pod, _, _ interface{}) framework.QueueingHint {
var fakeNodePluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) framework.QueueingHint {
return hintFromFakeNode
}
@ -1053,7 +1054,7 @@ var hintFromFakePod = framework.QueueingHint(101)
type fakePodPlugin struct{}
var fakePodPluginQueueingFn = func(_ *v1.Pod, _, _ interface{}) framework.QueueingHint {
var fakePodPluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) framework.QueueingHint {
return hintFromFakePod
}

View File

@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/framework"
st "k8s.io/kubernetes/pkg/scheduler/testing"
@ -69,7 +70,7 @@ func (rp *ReservePlugin) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{
Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add},
QueueingHintFn: func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
QueueingHintFn: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
return framework.QueueImmediately
},
},
@ -106,7 +107,7 @@ func (pp *PermitPlugin) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{
Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add},
QueueingHintFn: func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
QueueingHintFn: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
return framework.QueueImmediately
},
},