mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
459 lines
16 KiB
Go
459 lines
16 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package scheduler
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/dynamic/dynamicinformer"
|
|
"k8s.io/client-go/informers"
|
|
coreinformers "k8s.io/client-go/informers/core/v1"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
restclient "k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/klog/v2"
|
|
configv1 "k8s.io/kube-scheduler/config/v1"
|
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
|
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
|
|
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
|
|
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
|
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
|
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
|
|
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
|
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
|
"k8s.io/kubernetes/pkg/scheduler/profile"
|
|
)
|
|
|
|
const (
|
|
// Duration the scheduler will wait before expiring an assumed pod.
|
|
// See issue #106361 for more details about this parameter and its value.
|
|
durationToExpireAssumedPod time.Duration = 0
|
|
)
|
|
|
|
// ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods.
|
|
var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
|
|
|
|
// Scheduler watches for new unscheduled pods. It attempts to find
|
|
// nodes that they fit on and writes bindings back to the api server.
|
|
type Scheduler struct {
|
|
// It is expected that changes made via Cache will be observed
|
|
// by NodeLister and Algorithm.
|
|
Cache internalcache.Cache
|
|
|
|
Extenders []framework.Extender
|
|
|
|
// NextPod should be a function that blocks until the next pod
|
|
// is available. We don't use a channel for this, because scheduling
|
|
// a pod may take some amount of time and we don't want pods to get
|
|
// stale while they sit in a channel.
|
|
NextPod func() *framework.QueuedPodInfo
|
|
|
|
// FailureHandler is called upon a scheduling failure.
|
|
FailureHandler FailureHandlerFn
|
|
|
|
// SchedulePod tries to schedule the given pod to one of the nodes in the node list.
|
|
// Return a struct of ScheduleResult with the name of suggested host on success,
|
|
// otherwise will return a FitError with reasons.
|
|
SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)
|
|
|
|
// Close this to shut down the scheduler.
|
|
StopEverything <-chan struct{}
|
|
|
|
// SchedulingQueue holds pods to be scheduled
|
|
SchedulingQueue internalqueue.SchedulingQueue
|
|
|
|
// Profiles are the scheduling profiles.
|
|
Profiles profile.Map
|
|
|
|
client clientset.Interface
|
|
|
|
nodeInfoSnapshot *internalcache.Snapshot
|
|
|
|
percentageOfNodesToScore int32
|
|
|
|
nextStartNodeIndex int
|
|
}
|
|
|
|
func (s *Scheduler) applyDefaultHandlers() {
|
|
s.SchedulePod = s.schedulePod
|
|
s.FailureHandler = s.handleSchedulingFailure
|
|
}
|
|
|
|
type schedulerOptions struct {
|
|
componentConfigVersion string
|
|
kubeConfig *restclient.Config
|
|
// Overridden by profile level percentageOfNodesToScore if set in v1.
|
|
percentageOfNodesToScore int32
|
|
podInitialBackoffSeconds int64
|
|
podMaxBackoffSeconds int64
|
|
podMaxInUnschedulablePodsDuration time.Duration
|
|
// Contains out-of-tree plugins to be merged with the in-tree registry.
|
|
frameworkOutOfTreeRegistry frameworkruntime.Registry
|
|
profiles []schedulerapi.KubeSchedulerProfile
|
|
extenders []schedulerapi.Extender
|
|
frameworkCapturer FrameworkCapturer
|
|
parallelism int32
|
|
applyDefaultProfile bool
|
|
}
|
|
|
|
// Option configures a Scheduler
|
|
type Option func(*schedulerOptions)
|
|
|
|
// ScheduleResult represents the result of scheduling a pod.
|
|
type ScheduleResult struct {
|
|
// Name of the selected node.
|
|
SuggestedHost string
|
|
// The number of nodes the scheduler evaluated the pod against in the filtering
|
|
// phase and beyond.
|
|
EvaluatedNodes int
|
|
// The number of nodes out of the evaluated ones that fit the pod.
|
|
FeasibleNodes int
|
|
// The nominating info for scheduling cycle.
|
|
nominatingInfo *framework.NominatingInfo
|
|
}
|
|
|
|
// WithComponentConfigVersion sets the component config version to the
|
|
// KubeSchedulerConfiguration version used. The string should be the full
|
|
// scheme group/version of the external type we converted from (for example
|
|
// "kubescheduler.config.k8s.io/v1")
|
|
func WithComponentConfigVersion(apiVersion string) Option {
|
|
return func(o *schedulerOptions) {
|
|
o.componentConfigVersion = apiVersion
|
|
}
|
|
}
|
|
|
|
// WithKubeConfig sets the kube config for Scheduler.
|
|
func WithKubeConfig(cfg *restclient.Config) Option {
|
|
return func(o *schedulerOptions) {
|
|
o.kubeConfig = cfg
|
|
}
|
|
}
|
|
|
|
// WithProfiles sets profiles for Scheduler. By default, there is one profile
|
|
// with the name "default-scheduler".
|
|
func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option {
|
|
return func(o *schedulerOptions) {
|
|
o.profiles = p
|
|
o.applyDefaultProfile = false
|
|
}
|
|
}
|
|
|
|
// WithParallelism sets the parallelism for all scheduler algorithms. Default is 16.
|
|
func WithParallelism(threads int32) Option {
|
|
return func(o *schedulerOptions) {
|
|
o.parallelism = threads
|
|
}
|
|
}
|
|
|
|
// WithPercentageOfNodesToScore sets percentageOfNodesToScore for Scheduler.
|
|
// The default value of 0 will use an adaptive percentage: 50 - (num of nodes)/125.
|
|
func WithPercentageOfNodesToScore(percentageOfNodesToScore *int32) Option {
|
|
return func(o *schedulerOptions) {
|
|
if percentageOfNodesToScore != nil {
|
|
o.percentageOfNodesToScore = *percentageOfNodesToScore
|
|
}
|
|
}
|
|
}
|
|
|
|
// WithFrameworkOutOfTreeRegistry sets the registry for out-of-tree plugins. Those plugins
|
|
// will be appended to the default registry.
|
|
func WithFrameworkOutOfTreeRegistry(registry frameworkruntime.Registry) Option {
|
|
return func(o *schedulerOptions) {
|
|
o.frameworkOutOfTreeRegistry = registry
|
|
}
|
|
}
|
|
|
|
// WithPodInitialBackoffSeconds sets podInitialBackoffSeconds for Scheduler, the default value is 1
|
|
func WithPodInitialBackoffSeconds(podInitialBackoffSeconds int64) Option {
|
|
return func(o *schedulerOptions) {
|
|
o.podInitialBackoffSeconds = podInitialBackoffSeconds
|
|
}
|
|
}
|
|
|
|
// WithPodMaxBackoffSeconds sets podMaxBackoffSeconds for Scheduler, the default value is 10
|
|
func WithPodMaxBackoffSeconds(podMaxBackoffSeconds int64) Option {
|
|
return func(o *schedulerOptions) {
|
|
o.podMaxBackoffSeconds = podMaxBackoffSeconds
|
|
}
|
|
}
|
|
|
|
// WithPodMaxInUnschedulablePodsDuration sets podMaxInUnschedulablePodsDuration for PriorityQueue.
|
|
func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option {
|
|
return func(o *schedulerOptions) {
|
|
o.podMaxInUnschedulablePodsDuration = duration
|
|
}
|
|
}
|
|
|
|
// WithExtenders sets extenders for the Scheduler
|
|
func WithExtenders(e ...schedulerapi.Extender) Option {
|
|
return func(o *schedulerOptions) {
|
|
o.extenders = e
|
|
}
|
|
}
|
|
|
|
// FrameworkCapturer is used for registering a notify function in building framework.
|
|
type FrameworkCapturer func(schedulerapi.KubeSchedulerProfile)
|
|
|
|
// WithBuildFrameworkCapturer sets a notify function for getting buildFramework details.
|
|
func WithBuildFrameworkCapturer(fc FrameworkCapturer) Option {
|
|
return func(o *schedulerOptions) {
|
|
o.frameworkCapturer = fc
|
|
}
|
|
}
|
|
|
|
var defaultSchedulerOptions = schedulerOptions{
|
|
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
|
|
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
|
|
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
|
|
podMaxInUnschedulablePodsDuration: internalqueue.DefaultPodMaxInUnschedulablePodsDuration,
|
|
parallelism: int32(parallelize.DefaultParallelism),
|
|
// Ideally we would statically set the default profile here, but we can't because
|
|
// creating the default profile may require testing feature gates, which may get
|
|
// set dynamically in tests. Therefore, we delay creating it until New is actually
|
|
// invoked.
|
|
applyDefaultProfile: true,
|
|
}
|
|
|
|
// New returns a Scheduler
|
|
func New(client clientset.Interface,
|
|
informerFactory informers.SharedInformerFactory,
|
|
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
|
|
recorderFactory profile.RecorderFactory,
|
|
stopCh <-chan struct{},
|
|
opts ...Option) (*Scheduler, error) {
|
|
|
|
stopEverything := stopCh
|
|
if stopEverything == nil {
|
|
stopEverything = wait.NeverStop
|
|
}
|
|
|
|
options := defaultSchedulerOptions
|
|
for _, opt := range opts {
|
|
opt(&options)
|
|
}
|
|
|
|
if options.applyDefaultProfile {
|
|
var versionedCfg configv1.KubeSchedulerConfiguration
|
|
scheme.Scheme.Default(&versionedCfg)
|
|
cfg := schedulerapi.KubeSchedulerConfiguration{}
|
|
if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
|
|
return nil, err
|
|
}
|
|
options.profiles = cfg.Profiles
|
|
}
|
|
|
|
registry := frameworkplugins.NewInTreeRegistry()
|
|
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
metrics.Register()
|
|
|
|
extenders, err := buildExtenders(options.extenders, options.profiles)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("couldn't build extenders: %w", err)
|
|
}
|
|
|
|
podLister := informerFactory.Core().V1().Pods().Lister()
|
|
nodeLister := informerFactory.Core().V1().Nodes().Lister()
|
|
|
|
snapshot := internalcache.NewEmptySnapshot()
|
|
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
|
|
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh)
|
|
|
|
profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
|
|
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
|
|
frameworkruntime.WithClientSet(client),
|
|
frameworkruntime.WithKubeConfig(options.kubeConfig),
|
|
frameworkruntime.WithInformerFactory(informerFactory),
|
|
frameworkruntime.WithSnapshotSharedLister(snapshot),
|
|
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
|
|
frameworkruntime.WithClusterEventMap(clusterEventMap),
|
|
frameworkruntime.WithClusterEventMap(clusterEventMap),
|
|
frameworkruntime.WithParallelism(int(options.parallelism)),
|
|
frameworkruntime.WithExtenders(extenders),
|
|
frameworkruntime.WithMetricsRecorder(metricsRecorder),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("initializing profiles: %v", err)
|
|
}
|
|
|
|
if len(profiles) == 0 {
|
|
return nil, errors.New("at least one profile is required")
|
|
}
|
|
|
|
preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)
|
|
for profileName, profile := range profiles {
|
|
preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()
|
|
}
|
|
podQueue := internalqueue.NewSchedulingQueue(
|
|
profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
|
|
informerFactory,
|
|
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
|
|
internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
|
|
internalqueue.WithPodLister(podLister),
|
|
internalqueue.WithClusterEventMap(clusterEventMap),
|
|
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
|
|
internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
|
|
internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),
|
|
internalqueue.WithMetricsRecorder(*metricsRecorder),
|
|
)
|
|
|
|
for _, fwk := range profiles {
|
|
fwk.SetPodNominator(podQueue)
|
|
}
|
|
|
|
schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything)
|
|
|
|
// Setup cache debugger.
|
|
debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)
|
|
debugger.ListenForSignal(stopEverything)
|
|
|
|
sched := &Scheduler{
|
|
Cache: schedulerCache,
|
|
client: client,
|
|
nodeInfoSnapshot: snapshot,
|
|
percentageOfNodesToScore: options.percentageOfNodesToScore,
|
|
Extenders: extenders,
|
|
NextPod: internalqueue.MakeNextPodFunc(podQueue),
|
|
StopEverything: stopEverything,
|
|
SchedulingQueue: podQueue,
|
|
Profiles: profiles,
|
|
}
|
|
sched.applyDefaultHandlers()
|
|
|
|
addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))
|
|
|
|
return sched, nil
|
|
}
|
|
|
|
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
|
|
func (sched *Scheduler) Run(ctx context.Context) {
|
|
sched.SchedulingQueue.Run()
|
|
|
|
// We need to start scheduleOne loop in a dedicated goroutine,
|
|
// because scheduleOne function hangs on getting the next item
|
|
// from the SchedulingQueue.
|
|
// If there are no new pods to schedule, it will be hanging there
|
|
// and if done in this goroutine it will be blocking closing
|
|
// SchedulingQueue, in effect causing a deadlock on shutdown.
|
|
go wait.UntilWithContext(ctx, sched.scheduleOne, 0)
|
|
|
|
<-ctx.Done()
|
|
sched.SchedulingQueue.Close()
|
|
}
|
|
|
|
// NewInformerFactory creates a SharedInformerFactory and initializes a scheduler specific
|
|
// in-place podInformer.
|
|
func NewInformerFactory(cs clientset.Interface, resyncPeriod time.Duration) informers.SharedInformerFactory {
|
|
informerFactory := informers.NewSharedInformerFactory(cs, resyncPeriod)
|
|
informerFactory.InformerFor(&v1.Pod{}, newPodInformer)
|
|
return informerFactory
|
|
}
|
|
|
|
func buildExtenders(extenders []schedulerapi.Extender, profiles []schedulerapi.KubeSchedulerProfile) ([]framework.Extender, error) {
|
|
var fExtenders []framework.Extender
|
|
if len(extenders) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
var ignoredExtendedResources []string
|
|
var ignorableExtenders []framework.Extender
|
|
for i := range extenders {
|
|
klog.V(2).InfoS("Creating extender", "extender", extenders[i])
|
|
extender, err := NewHTTPExtender(&extenders[i])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !extender.IsIgnorable() {
|
|
fExtenders = append(fExtenders, extender)
|
|
} else {
|
|
ignorableExtenders = append(ignorableExtenders, extender)
|
|
}
|
|
for _, r := range extenders[i].ManagedResources {
|
|
if r.IgnoredByScheduler {
|
|
ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
|
|
}
|
|
}
|
|
}
|
|
// place ignorable extenders to the tail of extenders
|
|
fExtenders = append(fExtenders, ignorableExtenders...)
|
|
|
|
// If there are any extended resources found from the Extenders, append them to the pluginConfig for each profile.
|
|
// This should only have an effect on ComponentConfig, where it is possible to configure Extenders and
|
|
// plugin args (and in which case the extender ignored resources take precedence).
|
|
if len(ignoredExtendedResources) == 0 {
|
|
return fExtenders, nil
|
|
}
|
|
|
|
for i := range profiles {
|
|
prof := &profiles[i]
|
|
var found = false
|
|
for k := range prof.PluginConfig {
|
|
if prof.PluginConfig[k].Name == noderesources.Name {
|
|
// Update the existing args
|
|
pc := &prof.PluginConfig[k]
|
|
args, ok := pc.Args.(*schedulerapi.NodeResourcesFitArgs)
|
|
if !ok {
|
|
return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", pc.Args)
|
|
}
|
|
args.IgnoredResources = ignoredExtendedResources
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
return nil, fmt.Errorf("can't find NodeResourcesFitArgs in plugin config")
|
|
}
|
|
}
|
|
return fExtenders, nil
|
|
}
|
|
|
|
type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time)
|
|
|
|
func unionedGVKs(m map[framework.ClusterEvent]sets.String) map[framework.GVK]framework.ActionType {
|
|
gvkMap := make(map[framework.GVK]framework.ActionType)
|
|
for evt := range m {
|
|
if _, ok := gvkMap[evt.Resource]; ok {
|
|
gvkMap[evt.Resource] |= evt.ActionType
|
|
} else {
|
|
gvkMap[evt.Resource] = evt.ActionType
|
|
}
|
|
}
|
|
return gvkMap
|
|
}
|
|
|
|
// newPodInformer creates a shared index informer that returns only non-terminal pods.
|
|
// The PodInformer allows indexers to be added, but note that only non-conflict indexers are allowed.
|
|
func newPodInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
|
|
selector := fmt.Sprintf("status.phase!=%v,status.phase!=%v", v1.PodSucceeded, v1.PodFailed)
|
|
tweakListOptions := func(options *metav1.ListOptions) {
|
|
options.FieldSelector = selector
|
|
}
|
|
return coreinformers.NewFilteredPodInformer(cs, metav1.NamespaceAll, resyncPeriod, cache.Indexers{}, tweakListOptions)
|
|
}
|