mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-04 07:49:35 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			684 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			684 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2014 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package scheduler
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"math/rand"
 | 
						|
	"strconv"
 | 
						|
	"time"
 | 
						|
 | 
						|
	v1 "k8s.io/api/core/v1"
 | 
						|
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | 
						|
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						|
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						|
	"k8s.io/client-go/dynamic/dynamicinformer"
 | 
						|
	"k8s.io/client-go/informers"
 | 
						|
	coreinformers "k8s.io/client-go/informers/core/v1"
 | 
						|
	clientset "k8s.io/client-go/kubernetes"
 | 
						|
	restclient "k8s.io/client-go/rest"
 | 
						|
	"k8s.io/client-go/tools/cache"
 | 
						|
	"k8s.io/klog/v2"
 | 
						|
	"k8s.io/kube-scheduler/config/v1beta3"
 | 
						|
	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
 | 
						|
	"k8s.io/kubernetes/pkg/apis/core/validation"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/apis/config"
 | 
						|
	schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/framework"
 | 
						|
	frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
 | 
						|
	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
 | 
						|
	internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
 | 
						|
	internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/metrics"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/profile"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/util"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	// SchedulerError is the reason recorded for events when an error occurs during scheduling a pod.
 | 
						|
	SchedulerError = "SchedulerError"
 | 
						|
	// Percentage of plugin metrics to be sampled.
 | 
						|
	pluginMetricsSamplePercent = 10
 | 
						|
)
 | 
						|
 | 
						|
// Scheduler watches for new unscheduled pods. It attempts to find
 | 
						|
// nodes that they fit on and writes bindings back to the api server.
 | 
						|
type Scheduler struct {
 | 
						|
	// It is expected that changes made via SchedulerCache will be observed
 | 
						|
	// by NodeLister and Algorithm.
 | 
						|
	SchedulerCache internalcache.Cache
 | 
						|
 | 
						|
	Algorithm ScheduleAlgorithm
 | 
						|
 | 
						|
	Extenders []framework.Extender
 | 
						|
 | 
						|
	// NextPod should be a function that blocks until the next pod
 | 
						|
	// is available. We don't use a channel for this, because scheduling
 | 
						|
	// a pod may take some amount of time and we don't want pods to get
 | 
						|
	// stale while they sit in a channel.
 | 
						|
	NextPod func() *framework.QueuedPodInfo
 | 
						|
 | 
						|
	// Error is called if there is an error. It is passed the pod in
 | 
						|
	// question, and the error
 | 
						|
	Error func(*framework.QueuedPodInfo, error)
 | 
						|
 | 
						|
	// Close this to shut down the scheduler.
 | 
						|
	StopEverything <-chan struct{}
 | 
						|
 | 
						|
	// SchedulingQueue holds pods to be scheduled
 | 
						|
	SchedulingQueue internalqueue.SchedulingQueue
 | 
						|
 | 
						|
	// Profiles are the scheduling profiles.
 | 
						|
	Profiles profile.Map
 | 
						|
 | 
						|
	client clientset.Interface
 | 
						|
}
 | 
						|
 | 
						|
type schedulerOptions struct {
 | 
						|
	componentConfigVersion   string
 | 
						|
	kubeConfig               *restclient.Config
 | 
						|
	percentageOfNodesToScore int32
 | 
						|
	podInitialBackoffSeconds int64
 | 
						|
	podMaxBackoffSeconds     int64
 | 
						|
	// Contains out-of-tree plugins to be merged with the in-tree registry.
 | 
						|
	frameworkOutOfTreeRegistry frameworkruntime.Registry
 | 
						|
	profiles                   []schedulerapi.KubeSchedulerProfile
 | 
						|
	extenders                  []schedulerapi.Extender
 | 
						|
	frameworkCapturer          FrameworkCapturer
 | 
						|
	parallelism                int32
 | 
						|
	applyDefaultProfile        bool
 | 
						|
}
 | 
						|
 | 
						|
// Option configures a Scheduler
 | 
						|
type Option func(*schedulerOptions)
 | 
						|
 | 
						|
// WithComponentConfigVersion sets the component config version to the
 | 
						|
// KubeSchedulerConfiguration version used. The string should be the full
 | 
						|
// scheme group/version of the external type we converted from (for example
 | 
						|
// "kubescheduler.config.k8s.io/v1beta2")
 | 
						|
func WithComponentConfigVersion(apiVersion string) Option {
 | 
						|
	return func(o *schedulerOptions) {
 | 
						|
		o.componentConfigVersion = apiVersion
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// WithKubeConfig sets the kube config for Scheduler.
 | 
						|
func WithKubeConfig(cfg *restclient.Config) Option {
 | 
						|
	return func(o *schedulerOptions) {
 | 
						|
		o.kubeConfig = cfg
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// WithProfiles sets profiles for Scheduler. By default, there is one profile
 | 
						|
// with the name "default-scheduler".
 | 
						|
func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option {
 | 
						|
	return func(o *schedulerOptions) {
 | 
						|
		o.profiles = p
 | 
						|
		o.applyDefaultProfile = false
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// WithParallelism sets the parallelism for all scheduler algorithms. Default is 16.
 | 
						|
func WithParallelism(threads int32) Option {
 | 
						|
	return func(o *schedulerOptions) {
 | 
						|
		o.parallelism = threads
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// WithPercentageOfNodesToScore sets percentageOfNodesToScore for Scheduler, the default value is 50
 | 
						|
func WithPercentageOfNodesToScore(percentageOfNodesToScore int32) Option {
 | 
						|
	return func(o *schedulerOptions) {
 | 
						|
		o.percentageOfNodesToScore = percentageOfNodesToScore
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// WithFrameworkOutOfTreeRegistry sets the registry for out-of-tree plugins. Those plugins
 | 
						|
// will be appended to the default registry.
 | 
						|
func WithFrameworkOutOfTreeRegistry(registry frameworkruntime.Registry) Option {
 | 
						|
	return func(o *schedulerOptions) {
 | 
						|
		o.frameworkOutOfTreeRegistry = registry
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// WithPodInitialBackoffSeconds sets podInitialBackoffSeconds for Scheduler, the default value is 1
 | 
						|
func WithPodInitialBackoffSeconds(podInitialBackoffSeconds int64) Option {
 | 
						|
	return func(o *schedulerOptions) {
 | 
						|
		o.podInitialBackoffSeconds = podInitialBackoffSeconds
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// WithPodMaxBackoffSeconds sets podMaxBackoffSeconds for Scheduler, the default value is 10
 | 
						|
func WithPodMaxBackoffSeconds(podMaxBackoffSeconds int64) Option {
 | 
						|
	return func(o *schedulerOptions) {
 | 
						|
		o.podMaxBackoffSeconds = podMaxBackoffSeconds
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// WithExtenders sets extenders for the Scheduler
 | 
						|
func WithExtenders(e ...schedulerapi.Extender) Option {
 | 
						|
	return func(o *schedulerOptions) {
 | 
						|
		o.extenders = e
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// FrameworkCapturer is used for registering a notify function in building framework.
 | 
						|
type FrameworkCapturer func(schedulerapi.KubeSchedulerProfile)
 | 
						|
 | 
						|
// WithBuildFrameworkCapturer sets a notify function for getting buildFramework details.
 | 
						|
func WithBuildFrameworkCapturer(fc FrameworkCapturer) Option {
 | 
						|
	return func(o *schedulerOptions) {
 | 
						|
		o.frameworkCapturer = fc
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
var defaultSchedulerOptions = schedulerOptions{
 | 
						|
	percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
 | 
						|
	podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
 | 
						|
	podMaxBackoffSeconds:     int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
 | 
						|
	parallelism:              int32(parallelize.DefaultParallelism),
 | 
						|
	// Ideally we would statically set the default profile here, but we can't because
 | 
						|
	// creating the default profile may require testing feature gates, which may get
 | 
						|
	// set dynamically in tests. Therefore, we delay creating it until New is actually
 | 
						|
	// invoked.
 | 
						|
	applyDefaultProfile: true,
 | 
						|
}
 | 
						|
 | 
						|
// New returns a Scheduler
 | 
						|
func New(client clientset.Interface,
 | 
						|
	informerFactory informers.SharedInformerFactory,
 | 
						|
	dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
 | 
						|
	recorderFactory profile.RecorderFactory,
 | 
						|
	stopCh <-chan struct{},
 | 
						|
	opts ...Option) (*Scheduler, error) {
 | 
						|
 | 
						|
	stopEverything := stopCh
 | 
						|
	if stopEverything == nil {
 | 
						|
		stopEverything = wait.NeverStop
 | 
						|
	}
 | 
						|
 | 
						|
	options := defaultSchedulerOptions
 | 
						|
	for _, opt := range opts {
 | 
						|
		opt(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	if options.applyDefaultProfile {
 | 
						|
		var versionedCfg v1beta3.KubeSchedulerConfiguration
 | 
						|
		scheme.Scheme.Default(&versionedCfg)
 | 
						|
		cfg := config.KubeSchedulerConfiguration{}
 | 
						|
		if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		options.profiles = cfg.Profiles
 | 
						|
	}
 | 
						|
	schedulerCache := internalcache.New(30*time.Second, stopEverything)
 | 
						|
 | 
						|
	registry := frameworkplugins.NewInTreeRegistry()
 | 
						|
	if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	snapshot := internalcache.NewEmptySnapshot()
 | 
						|
	clusterEventMap := make(map[framework.ClusterEvent]sets.String)
 | 
						|
 | 
						|
	configurator := &Configurator{
 | 
						|
		componentConfigVersion:   options.componentConfigVersion,
 | 
						|
		client:                   client,
 | 
						|
		kubeConfig:               options.kubeConfig,
 | 
						|
		recorderFactory:          recorderFactory,
 | 
						|
		informerFactory:          informerFactory,
 | 
						|
		schedulerCache:           schedulerCache,
 | 
						|
		StopEverything:           stopEverything,
 | 
						|
		percentageOfNodesToScore: options.percentageOfNodesToScore,
 | 
						|
		podInitialBackoffSeconds: options.podInitialBackoffSeconds,
 | 
						|
		podMaxBackoffSeconds:     options.podMaxBackoffSeconds,
 | 
						|
		profiles:                 append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),
 | 
						|
		registry:                 registry,
 | 
						|
		nodeInfoSnapshot:         snapshot,
 | 
						|
		extenders:                options.extenders,
 | 
						|
		frameworkCapturer:        options.frameworkCapturer,
 | 
						|
		parallellism:             options.parallelism,
 | 
						|
		clusterEventMap:          clusterEventMap,
 | 
						|
	}
 | 
						|
 | 
						|
	metrics.Register()
 | 
						|
 | 
						|
	// Create the config from component config
 | 
						|
	sched, err := configurator.create()
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("couldn't create scheduler: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	// Additional tweaks to the config produced by the configurator.
 | 
						|
	sched.StopEverything = stopEverything
 | 
						|
	sched.client = client
 | 
						|
 | 
						|
	addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))
 | 
						|
 | 
						|
	return sched, nil
 | 
						|
}
 | 
						|
 | 
						|
func unionedGVKs(m map[framework.ClusterEvent]sets.String) map[framework.GVK]framework.ActionType {
 | 
						|
	gvkMap := make(map[framework.GVK]framework.ActionType)
 | 
						|
	for evt := range m {
 | 
						|
		if _, ok := gvkMap[evt.Resource]; ok {
 | 
						|
			gvkMap[evt.Resource] |= evt.ActionType
 | 
						|
		} else {
 | 
						|
			gvkMap[evt.Resource] = evt.ActionType
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return gvkMap
 | 
						|
}
 | 
						|
 | 
						|
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
 | 
						|
func (sched *Scheduler) Run(ctx context.Context) {
 | 
						|
	sched.SchedulingQueue.Run()
 | 
						|
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
 | 
						|
	sched.SchedulingQueue.Close()
 | 
						|
}
 | 
						|
 | 
						|
// recordSchedulingFailure records an event for the pod that indicates the
 | 
						|
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
 | 
						|
func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) {
 | 
						|
	sched.Error(podInfo, err)
 | 
						|
 | 
						|
	// Update the scheduling queue with the nominated pod information. Without
 | 
						|
	// this, there would be a race condition between the next scheduling cycle
 | 
						|
	// and the time the scheduler receives a Pod Update for the nominated pod.
 | 
						|
	// Here we check for nil only for tests.
 | 
						|
	if sched.SchedulingQueue != nil {
 | 
						|
		sched.SchedulingQueue.AddNominatedPod(podInfo.PodInfo, nominatedNode)
 | 
						|
	}
 | 
						|
 | 
						|
	pod := podInfo.Pod
 | 
						|
	msg := truncateMessage(err.Error())
 | 
						|
	fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
 | 
						|
	if err := updatePod(sched.client, pod, &v1.PodCondition{
 | 
						|
		Type:    v1.PodScheduled,
 | 
						|
		Status:  v1.ConditionFalse,
 | 
						|
		Reason:  reason,
 | 
						|
		Message: err.Error(),
 | 
						|
	}, nominatedNode); err != nil {
 | 
						|
		klog.ErrorS(err, "Error updating pod", "pod", klog.KObj(pod))
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// truncateMessage truncates a message if it hits the NoteLengthLimit.
 | 
						|
func truncateMessage(message string) string {
 | 
						|
	max := validation.NoteLengthLimit
 | 
						|
	if len(message) <= max {
 | 
						|
		return message
 | 
						|
	}
 | 
						|
	suffix := " ..."
 | 
						|
	return message[:max-len(suffix)] + suffix
 | 
						|
}
 | 
						|
 | 
						|
func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatedNode string) error {
 | 
						|
	klog.V(3).InfoS("Updating pod condition", "pod", klog.KObj(pod), "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason)
 | 
						|
	podStatusCopy := pod.Status.DeepCopy()
 | 
						|
	// NominatedNodeName is updated only if we are trying to set it, and the value is
 | 
						|
	// different from the existing one.
 | 
						|
	if !podutil.UpdatePodCondition(podStatusCopy, condition) &&
 | 
						|
		(len(nominatedNode) == 0 || pod.Status.NominatedNodeName == nominatedNode) {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if nominatedNode != "" {
 | 
						|
		podStatusCopy.NominatedNodeName = nominatedNode
 | 
						|
	}
 | 
						|
	return util.PatchPodStatus(client, pod, podStatusCopy)
 | 
						|
}
 | 
						|
 | 
						|
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
 | 
						|
// assume modifies `assumed`.
 | 
						|
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
 | 
						|
	// Optimistically assume that the binding will succeed and send it to apiserver
 | 
						|
	// in the background.
 | 
						|
	// If the binding fails, scheduler will release resources allocated to assumed pod
 | 
						|
	// immediately.
 | 
						|
	assumed.Spec.NodeName = host
 | 
						|
 | 
						|
	if err := sched.SchedulerCache.AssumePod(assumed); err != nil {
 | 
						|
		klog.ErrorS(err, "Scheduler cache AssumePod failed")
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	// if "assumed" is a nominated pod, we should remove it from internal cache
 | 
						|
	if sched.SchedulingQueue != nil {
 | 
						|
		sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// bind binds a pod to a given node defined in a binding object.
 | 
						|
// The precedence for binding is: (1) extenders and (2) framework plugins.
 | 
						|
// We expect this to run asynchronously, so we handle binding metrics internally.
 | 
						|
func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) {
 | 
						|
	defer func() {
 | 
						|
		sched.finishBinding(fwk, assumed, targetNode, err)
 | 
						|
	}()
 | 
						|
 | 
						|
	bound, err := sched.extendersBinding(assumed, targetNode)
 | 
						|
	if bound {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	bindStatus := fwk.RunBindPlugins(ctx, state, assumed, targetNode)
 | 
						|
	if bindStatus.IsSuccess() {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if bindStatus.Code() == framework.Error {
 | 
						|
		return bindStatus.AsError()
 | 
						|
	}
 | 
						|
	return fmt.Errorf("bind status: %s, %v", bindStatus.Code().String(), bindStatus.Message())
 | 
						|
}
 | 
						|
 | 
						|
// TODO(#87159): Move this to a Plugin.
 | 
						|
func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error) {
 | 
						|
	for _, extender := range sched.Extenders {
 | 
						|
		if !extender.IsBinder() || !extender.IsInterested(pod) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		return true, extender.Bind(&v1.Binding{
 | 
						|
			ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name, UID: pod.UID},
 | 
						|
			Target:     v1.ObjectReference{Kind: "Node", Name: node},
 | 
						|
		})
 | 
						|
	}
 | 
						|
	return false, nil
 | 
						|
}
 | 
						|
 | 
						|
func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod, targetNode string, err error) {
 | 
						|
	if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil {
 | 
						|
		klog.ErrorS(finErr, "Scheduler cache FinishBinding failed")
 | 
						|
	}
 | 
						|
	if err != nil {
 | 
						|
		klog.V(1).InfoS("Failed to bind pod", "pod", klog.KObj(assumed))
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	fwk.EventRecorder().Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
 | 
						|
}
 | 
						|
 | 
						|
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
 | 
						|
func (sched *Scheduler) scheduleOne(ctx context.Context) {
 | 
						|
	podInfo := sched.NextPod()
 | 
						|
	// pod could be nil when schedulerQueue is closed
 | 
						|
	if podInfo == nil || podInfo.Pod == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	pod := podInfo.Pod
 | 
						|
	fwk, err := sched.frameworkForPod(pod)
 | 
						|
	if err != nil {
 | 
						|
		// This shouldn't happen, because we only accept for scheduling the pods
 | 
						|
		// which specify a scheduler name that matches one of the profiles.
 | 
						|
		klog.ErrorS(err, "Error occurred")
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if sched.skipPodSchedule(fwk, pod) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod))
 | 
						|
 | 
						|
	// Synchronously attempt to find a fit for the pod.
 | 
						|
	start := time.Now()
 | 
						|
	state := framework.NewCycleState()
 | 
						|
	state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
 | 
						|
	// Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
 | 
						|
	podsToActivate := framework.NewPodsToActivate()
 | 
						|
	state.Write(framework.PodsToActivateKey, podsToActivate)
 | 
						|
 | 
						|
	schedulingCycleCtx, cancel := context.WithCancel(ctx)
 | 
						|
	defer cancel()
 | 
						|
	scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod)
 | 
						|
	if err != nil {
 | 
						|
		// Schedule() may have failed because the pod would not fit on any host, so we try to
 | 
						|
		// preempt, with the expectation that the next time the pod is tried for scheduling it
 | 
						|
		// will fit due to the preemption. It is also possible that a different pod will schedule
 | 
						|
		// into the resources that were preempted, but this is harmless.
 | 
						|
		nominatedNode := ""
 | 
						|
		if fitError, ok := err.(*framework.FitError); ok {
 | 
						|
			if !fwk.HasPostFilterPlugins() {
 | 
						|
				klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
 | 
						|
			} else {
 | 
						|
				// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
 | 
						|
				result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
 | 
						|
				if status.Code() == framework.Error {
 | 
						|
					klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
 | 
						|
				} else {
 | 
						|
					klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
 | 
						|
				}
 | 
						|
				if status.IsSuccess() && result != nil {
 | 
						|
					nominatedNode = result.NominatedNodeName
 | 
						|
				}
 | 
						|
			}
 | 
						|
			// Pod did not fit anywhere, so it is counted as a failure. If preemption
 | 
						|
			// succeeds, the pod should get counted as a success the next time we try to
 | 
						|
			// schedule it. (hopefully)
 | 
						|
			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
 | 
						|
		} else if err == ErrNoNodesAvailable {
 | 
						|
			// No nodes available is counted as unschedulable rather than an error.
 | 
						|
			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
 | 
						|
		} else {
 | 
						|
			klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
 | 
						|
			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
 | 
						|
		}
 | 
						|
		sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
 | 
						|
	// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
 | 
						|
	// This allows us to keep scheduling without waiting on binding to occur.
 | 
						|
	assumedPodInfo := podInfo.DeepCopy()
 | 
						|
	assumedPod := assumedPodInfo.Pod
 | 
						|
	// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
 | 
						|
	err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
 | 
						|
	if err != nil {
 | 
						|
		metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
 | 
						|
		// This is most probably result of a BUG in retrying logic.
 | 
						|
		// We report an error here so that pod scheduling can be retried.
 | 
						|
		// This relies on the fact that Error will check if the pod has been bound
 | 
						|
		// to a node and if so will not add it back to the unscheduled pods queue
 | 
						|
		// (otherwise this would cause an infinite loop).
 | 
						|
		sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, "")
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// Run the Reserve method of reserve plugins.
 | 
						|
	if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
 | 
						|
		metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
 | 
						|
		// trigger un-reserve to clean up state associated with the reserved Pod
 | 
						|
		fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
						|
		if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
 | 
						|
			klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
 | 
						|
		}
 | 
						|
		sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, "")
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// Run "permit" plugins.
 | 
						|
	runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
						|
	if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
 | 
						|
		var reason string
 | 
						|
		if runPermitStatus.IsUnschedulable() {
 | 
						|
			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
 | 
						|
			reason = v1.PodReasonUnschedulable
 | 
						|
		} else {
 | 
						|
			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
 | 
						|
			reason = SchedulerError
 | 
						|
		}
 | 
						|
		// One of the plugins returned status different than success or wait.
 | 
						|
		fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
						|
		if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
 | 
						|
			klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
 | 
						|
		}
 | 
						|
		sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, "")
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// At the end of a successful scheduling cycle, pop and move up Pods if needed.
 | 
						|
	if len(podsToActivate.Map) != 0 {
 | 
						|
		sched.SchedulingQueue.Activate(podsToActivate.Map)
 | 
						|
		// Clear the entries after activation.
 | 
						|
		podsToActivate.Map = make(map[string]*v1.Pod)
 | 
						|
	}
 | 
						|
 | 
						|
	// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
 | 
						|
	go func() {
 | 
						|
		bindingCycleCtx, cancel := context.WithCancel(ctx)
 | 
						|
		defer cancel()
 | 
						|
		metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
 | 
						|
		defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()
 | 
						|
 | 
						|
		waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
 | 
						|
		if !waitOnPermitStatus.IsSuccess() {
 | 
						|
			var reason string
 | 
						|
			if waitOnPermitStatus.IsUnschedulable() {
 | 
						|
				metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
 | 
						|
				reason = v1.PodReasonUnschedulable
 | 
						|
			} else {
 | 
						|
				metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
 | 
						|
				reason = SchedulerError
 | 
						|
			}
 | 
						|
			// trigger un-reserve plugins to clean up state associated with the reserved Pod
 | 
						|
			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
						|
			if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
 | 
						|
				klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
 | 
						|
			} else {
 | 
						|
				// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
 | 
						|
				// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
 | 
						|
				// TODO(#103853): de-duplicate the logic.
 | 
						|
				// Avoid moving the assumed Pod itself as it's always Unschedulable.
 | 
						|
				// It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
 | 
						|
				// update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
 | 
						|
				defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
 | 
						|
					return assumedPod.UID != pod.UID
 | 
						|
				})
 | 
						|
			}
 | 
						|
			sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// Run "prebind" plugins.
 | 
						|
		preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
						|
		if !preBindStatus.IsSuccess() {
 | 
						|
			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
 | 
						|
			// trigger un-reserve plugins to clean up state associated with the reserved Pod
 | 
						|
			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
						|
			if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
 | 
						|
				klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
 | 
						|
			} else {
 | 
						|
				// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
 | 
						|
				// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
 | 
						|
				// TODO(#103853): de-duplicate the logic.
 | 
						|
				sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
 | 
						|
			}
 | 
						|
			sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
 | 
						|
		if err != nil {
 | 
						|
			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
 | 
						|
			// trigger un-reserve plugins to clean up state associated with the reserved Pod
 | 
						|
			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
						|
			if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil {
 | 
						|
				klog.ErrorS(err, "scheduler cache ForgetPod failed")
 | 
						|
			} else {
 | 
						|
				// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
 | 
						|
				// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
 | 
						|
				// TODO(#103853): de-duplicate the logic.
 | 
						|
				sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
 | 
						|
			}
 | 
						|
			sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, "")
 | 
						|
		} else {
 | 
						|
			// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
 | 
						|
			if klog.V(2).Enabled() {
 | 
						|
				klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
 | 
						|
			}
 | 
						|
			metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
 | 
						|
			metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
 | 
						|
			metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
 | 
						|
 | 
						|
			// Run "postbind" plugins.
 | 
						|
			fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
						|
 | 
						|
			// At the end of a successful binding cycle, move up Pods if needed.
 | 
						|
			if len(podsToActivate.Map) != 0 {
 | 
						|
				sched.SchedulingQueue.Activate(podsToActivate.Map)
 | 
						|
				// Unlike the logic in scheduling cycle, we don't bother deleting the entries
 | 
						|
				// as `podsToActivate.Map` is no longer consumed.
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
}
 | 
						|
 | 
						|
func getAttemptsLabel(p *framework.QueuedPodInfo) string {
 | 
						|
	// We breakdown the pod scheduling duration by attempts capped to a limit
 | 
						|
	// to avoid ending up with a high cardinality metric.
 | 
						|
	if p.Attempts >= 15 {
 | 
						|
		return "15+"
 | 
						|
	}
 | 
						|
	return strconv.Itoa(p.Attempts)
 | 
						|
}
 | 
						|
 | 
						|
func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {
 | 
						|
	fwk, ok := sched.Profiles[pod.Spec.SchedulerName]
 | 
						|
	if !ok {
 | 
						|
		return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName)
 | 
						|
	}
 | 
						|
	return fwk, nil
 | 
						|
}
 | 
						|
 | 
						|
// skipPodSchedule returns true if we could skip scheduling the pod for specified cases.
 | 
						|
func (sched *Scheduler) skipPodSchedule(fwk framework.Framework, pod *v1.Pod) bool {
 | 
						|
	// Case 1: pod is being deleted.
 | 
						|
	if pod.DeletionTimestamp != nil {
 | 
						|
		fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
 | 
						|
		klog.V(3).InfoS("Skip schedule deleting pod", "pod", klog.KObj(pod))
 | 
						|
		return true
 | 
						|
	}
 | 
						|
 | 
						|
	// Case 2: pod that has been assumed could be skipped.
 | 
						|
	// An assumed pod can be added again to the scheduling queue if it got an update event
 | 
						|
	// during its previous scheduling cycle but before getting assumed.
 | 
						|
	isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod)
 | 
						|
	if err != nil {
 | 
						|
		utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	return isAssumed
 | 
						|
}
 | 
						|
 | 
						|
// NewInformerFactory creates a SharedInformerFactory and initializes a scheduler specific
 | 
						|
// in-place podInformer.
 | 
						|
func NewInformerFactory(cs clientset.Interface, resyncPeriod time.Duration) informers.SharedInformerFactory {
 | 
						|
	informerFactory := informers.NewSharedInformerFactory(cs, resyncPeriod)
 | 
						|
	informerFactory.InformerFor(&v1.Pod{}, newPodInformer)
 | 
						|
	return informerFactory
 | 
						|
}
 | 
						|
 | 
						|
// newPodInformer creates a shared index informer that returns only non-terminal pods.
 | 
						|
func newPodInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
 | 
						|
	selector := fmt.Sprintf("status.phase!=%v,status.phase!=%v", v1.PodSucceeded, v1.PodFailed)
 | 
						|
	tweakListOptions := func(options *metav1.ListOptions) {
 | 
						|
		options.FieldSelector = selector
 | 
						|
	}
 | 
						|
	return coreinformers.NewFilteredPodInformer(cs, metav1.NamespaceAll, resyncPeriod, nil, tweakListOptions)
 | 
						|
}
 |