fix(scheduler): fix a possible memory leak for QueueingHint

This commit is contained in:
Kensei Nakada 2024-08-28 15:23:40 +09:00
parent bce499c136
commit 0b71f256a8
3 changed files with 114 additions and 92 deletions

View File

@ -120,6 +120,7 @@ type SchedulingQueue interface {
AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod, event framework.ClusterEvent) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod, event framework.ClusterEvent)
PendingPods() ([]*v1.Pod, string) PendingPods() ([]*v1.Pod, string)
PodsInActiveQ() []*v1.Pod PodsInActiveQ() []*v1.Pod
InFlightPods() []*v1.Pod
// Close closes the SchedulingQueue so that the goroutine which is // Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully. // waiting to pop items can exit gracefully.
Close() Close()
@ -837,6 +838,15 @@ func (p *PriorityQueue) Done(pod types.UID) {
p.activeQ.done(pod) p.activeQ.done(pod)
} }
func (p *PriorityQueue) InFlightPods() []*v1.Pod {
if !p.isSchedulingQueueHintEnabled {
// do nothing if schedulingQueueHint is disabled.
// In that case, we don't have inFlightPods and inFlightEvents.
return nil
}
return p.activeQ.listInFlightPods()
}
// isPodUpdated checks if the pod is updated in a way that it may have become // isPodUpdated checks if the pod is updated in a way that it may have become
// schedulable. It drops status of the pod and compares it with old version, // schedulable. It drops status of the pod and compares it with old version,
// except for pod.status.resourceClaimStatuses: changing that may have an // except for pod.status.resourceClaimStatuses: changing that may have an

View File

@ -87,9 +87,12 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) {
// This shouldn't happen, because we only accept for scheduling the pods // This shouldn't happen, because we only accept for scheduling the pods
// which specify a scheduler name that matches one of the profiles. // which specify a scheduler name that matches one of the profiles.
logger.Error(err, "Error occurred") logger.Error(err, "Error occurred")
sched.SchedulingQueue.Done(pod.UID)
return return
} }
if sched.skipPodSchedule(ctx, fwk, pod) { if sched.skipPodSchedule(ctx, fwk, pod) {
// We don't put this Pod back to the queue, but we have to cleanup the in-flight pods/events.
sched.SchedulingQueue.Done(pod.UID)
return return
} }

View File

@ -41,16 +41,19 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake" clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
clienttesting "k8s.io/client-go/testing" clienttesting "k8s.io/client-go/testing"
clientcache "k8s.io/client-go/tools/cache" clientcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-helpers/storage/volume" "k8s.io/component-helpers/storage/volume"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting" "k8s.io/klog/v2/ktesting"
extenderv1 "k8s.io/kube-scheduler/extender/v1" extenderv1 "k8s.io/kube-scheduler/extender/v1"
"k8s.io/kubernetes/pkg/features"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache" internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/backend/cache/fake" fakecache "k8s.io/kubernetes/pkg/scheduler/backend/cache/fake"
@ -747,8 +750,10 @@ func TestSchedulerScheduleOne(t *testing.T) {
}, },
} }
for _, qHintEnabled := range []bool{true, false} {
for _, item := range table { for _, item := range table {
t.Run(item.name, func(t *testing.T) { t.Run(fmt.Sprintf("[QueueingHint: %v] %s", qHintEnabled, item.name), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, qHintEnabled)
var gotError error var gotError error
var gotPod *v1.Pod var gotPod *v1.Pod
var gotForgetPod *v1.Pod var gotForgetPod *v1.Pod
@ -776,14 +781,13 @@ func TestSchedulerScheduleOne(t *testing.T) {
gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding) gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
return true, gotBinding, item.injectBindError return true, gotBinding, item.injectBindError
}) })
registerPluginFuncs := append(item.registerPluginFuncs,
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
fwk, err := tf.NewFramework(ctx, fwk, err := tf.NewFramework(ctx,
registerPluginFuncs, append(item.registerPluginFuncs,
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
),
testSchedulerName, testSchedulerName,
frameworkruntime.WithClientSet(client), frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
@ -794,15 +798,15 @@ func TestSchedulerScheduleOne(t *testing.T) {
} }
informerFactory := informers.NewSharedInformerFactory(client, 0) informerFactory := informers.NewSharedInformerFactory(client, 0)
queue := internalqueue.NewSchedulingQueue(nil, informerFactory)
sched := &Scheduler{ sched := &Scheduler{
Cache: cache, Cache: cache,
client: client, client: client,
NextPod: func(logger klog.Logger) (*framework.QueuedPodInfo, error) { NextPod: queue.Pop,
return &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, item.sendPod)}, nil SchedulingQueue: queue,
},
SchedulingQueue: internalqueue.NewSchedulingQueue(nil, informerFactory),
Profiles: profile.Map{testSchedulerName: fwk}, Profiles: profile.Map{testSchedulerName: fwk},
} }
queue.Add(klog.Logger{}, item.sendPod)
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) { sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) {
return item.mockResult.result, item.mockResult.err return item.mockResult.result, item.mockResult.err
@ -813,6 +817,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
msg := truncateMessage(gotError.Error()) msg := truncateMessage(gotError.Error())
fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
queue.Done(p.Pod.UID)
} }
called := make(chan struct{}) called := make(chan struct{})
stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
@ -842,10 +847,14 @@ func TestSchedulerScheduleOne(t *testing.T) {
if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" { if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" {
t.Errorf("got binding diff (-want, +got): %s", diff) t.Errorf("got binding diff (-want, +got): %s", diff)
} }
if len(queue.InFlightPods()) != 0 {
t.Errorf("in-flight pods should be always empty after SchedulingOne. It has %v Pods", len(queue.InFlightPods()))
}
stopFunc() stopFunc()
}) })
} }
} }
}
func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t) logger, ctx := ktesting.NewTestContext(t)