Merge pull request #126962 from sanposhiho/memory-leak-scheduler

fix(scheduler): fix a possible memory leak for QueueingHint
This commit is contained in:
Kubernetes Prow Robot 2024-09-06 19:01:25 +01:00 committed by GitHub
commit f12334be03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 116 additions and 92 deletions

View File

@ -119,6 +119,7 @@ type SchedulingQueue interface {
AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod, event framework.ClusterEvent)
PendingPods() ([]*v1.Pod, string)
PodsInActiveQ() []*v1.Pod
InFlightPods() []*v1.Pod
// Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully.
Close()
@ -836,6 +837,15 @@ func (p *PriorityQueue) Done(pod types.UID) {
p.activeQ.done(pod)
}
func (p *PriorityQueue) InFlightPods() []*v1.Pod {
if !p.isSchedulingQueueHintEnabled {
// do nothing if schedulingQueueHint is disabled.
// In that case, we don't have inFlightPods and inFlightEvents.
return nil
}
return p.activeQ.listInFlightPods()
}
// isPodUpdated checks if the pod is updated in a way that it may have become
// schedulable. It drops status of the pod and compares it with old version,
// except for pod.status.resourceClaimStatuses: changing that may have an

View File

@ -87,9 +87,12 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) {
// This shouldn't happen, because we only accept for scheduling the pods
// which specify a scheduler name that matches one of the profiles.
logger.Error(err, "Error occurred")
sched.SchedulingQueue.Done(pod.UID)
return
}
if sched.skipPodSchedule(ctx, fwk, pod) {
// We don't put this Pod back to the queue, but we have to cleanup the in-flight pods/events.
sched.SchedulingQueue.Done(pod.UID)
return
}

View File

@ -41,16 +41,19 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
clienttesting "k8s.io/client-go/testing"
clientcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-helpers/storage/volume"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
"k8s.io/kubernetes/pkg/features"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/backend/cache/fake"
@ -65,6 +68,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/profile"
st "k8s.io/kubernetes/pkg/scheduler/testing"
tf "k8s.io/kubernetes/pkg/scheduler/testing/framework"
@ -747,8 +751,11 @@ func TestSchedulerScheduleOne(t *testing.T) {
},
}
for _, qHintEnabled := range []bool{true, false} {
for _, item := range table {
t.Run(item.name, func(t *testing.T) {
t.Run(fmt.Sprintf("[QueueingHint: %v] %s", qHintEnabled, item.name), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, qHintEnabled)
logger, ctx := ktesting.NewTestContext(t)
var gotError error
var gotPod *v1.Pod
var gotForgetPod *v1.Pod
@ -776,14 +783,12 @@ func TestSchedulerScheduleOne(t *testing.T) {
gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
return true, gotBinding, item.injectBindError
})
registerPluginFuncs := append(item.registerPluginFuncs,
fwk, err := tf.NewFramework(ctx,
append(item.registerPluginFuncs,
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := tf.NewFramework(ctx,
registerPluginFuncs,
),
testSchedulerName,
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
@ -794,15 +799,16 @@ func TestSchedulerScheduleOne(t *testing.T) {
}
informerFactory := informers.NewSharedInformerFactory(client, 0)
ar := metrics.NewMetricsAsyncRecorder(10, 1*time.Second, ctx.Done())
queue := internalqueue.NewSchedulingQueue(nil, informerFactory, internalqueue.WithMetricsRecorder(*ar))
sched := &Scheduler{
Cache: cache,
client: client,
NextPod: func(logger klog.Logger) (*framework.QueuedPodInfo, error) {
return &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, item.sendPod)}, nil
},
SchedulingQueue: internalqueue.NewSchedulingQueue(nil, informerFactory),
NextPod: queue.Pop,
SchedulingQueue: queue,
Profiles: profile.Map{testSchedulerName: fwk},
}
queue.Add(logger, item.sendPod)
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) {
return item.mockResult.result, item.mockResult.err
@ -813,6 +819,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
msg := truncateMessage(gotError.Error())
fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
queue.Done(p.Pod.UID)
}
called := make(chan struct{})
stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
@ -842,9 +849,13 @@ func TestSchedulerScheduleOne(t *testing.T) {
if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" {
t.Errorf("got binding diff (-want, +got): %s", diff)
}
if len(queue.InFlightPods()) != 0 {
t.Errorf("in-flight pods should be always empty after SchedulingOne. It has %v Pods", len(queue.InFlightPods()))
}
stopFunc()
})
}
}
}
func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {