move PodConditionUpdater to scheduler

This commit is contained in:
Ahmad Diaa 2019-10-01 03:13:14 +02:00
parent 9ac9a1d397
commit 6c75e1baa2
5 changed files with 52 additions and 60 deletions

View File

@ -9,6 +9,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler",
visibility = ["//visibility:public"],
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/api/latest:go_default_library",

View File

@ -9,7 +9,6 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/factory",
visibility = ["//visibility:public"],
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",

View File

@ -45,7 +45,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/klog"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
@ -72,12 +71,6 @@ type Binder interface {
Bind(binding *v1.Binding) error
}
// PodConditionUpdater updates the condition of a pod based on the passed
// PodCondition
type PodConditionUpdater interface {
Update(pod *v1.Pod, podCondition *v1.PodCondition) error
}
// Config is an implementation of the Scheduler's configured input data.
// TODO over time we should make this struct a hidden implementation detail of the scheduler.
type Config struct {
@ -85,10 +78,6 @@ type Config struct {
Algorithm core.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update 'NominatedNode' field of
// the preemptor pod.
PodPreemptor PodPreemptor
@ -454,12 +443,11 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
)
return &Config{
SchedulerCache: c.schedulerCache,
Algorithm: algo,
GetBinder: getBinderFunc(c.client, extenders),
PodConditionUpdater: &podConditionUpdater{c.client},
PodPreemptor: &podPreemptor{c.client},
Framework: framework,
SchedulerCache: c.schedulerCache,
Algorithm: algo,
GetBinder: getBinderFunc(c.client, extenders),
PodPreemptor: &podPreemptor{c.client},
Framework: framework,
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
},
@ -705,19 +693,6 @@ func (b *binder) Bind(binding *v1.Binding) error {
return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)
}
type podConditionUpdater struct {
Client clientset.Interface
}
func (p *podConditionUpdater) Update(pod *v1.Pod, condition *v1.PodCondition) error {
klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason)
if podutil.UpdatePodCondition(&pod.Status, condition) {
_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
return err
}
return nil
}
type podPreemptor struct {
Client clientset.Interface
}

View File

@ -35,6 +35,7 @@ import (
storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/events"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -54,6 +55,12 @@ const (
SchedulerError = "SchedulerError"
)
// podConditionUpdater updates the condition of a pod based on the passed
// PodCondition
type podConditionUpdater interface {
update(pod *v1.Pod, podCondition *v1.PodCondition) error
}
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
@ -66,7 +73,7 @@ type Scheduler struct {
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater factory.PodConditionUpdater
podConditionUpdater podConditionUpdater
// PodPreemptor is used to evict pods and update 'NominatedNode' field of
// the preemptor pod.
PodPreemptor factory.PodPreemptor
@ -247,7 +254,7 @@ func New(client clientset.Interface,
// Create the scheduler.
sched := NewFromConfig(config)
sched.podConditionUpdater = &podConditionUpdaterImpl{client}
AddAllEventHandlers(sched, options.schedulerName, nodeInformer, podInformer, pvInformer, pvcInformer, serviceInformer, storageClassInformer, csiNodeInformer)
return sched, nil
}
@ -292,20 +299,19 @@ func initPolicyFromConfigMap(client clientset.Interface, policyRef *kubeschedule
func NewFromConfig(config *factory.Config) *Scheduler {
metrics.Register()
return &Scheduler{
SchedulerCache: config.SchedulerCache,
Algorithm: config.Algorithm,
GetBinder: config.GetBinder,
PodConditionUpdater: config.PodConditionUpdater,
PodPreemptor: config.PodPreemptor,
Framework: config.Framework,
NextPod: config.NextPod,
WaitForCacheSync: config.WaitForCacheSync,
Error: config.Error,
Recorder: config.Recorder,
StopEverything: config.StopEverything,
VolumeBinder: config.VolumeBinder,
DisablePreemption: config.DisablePreemption,
SchedulingQueue: config.SchedulingQueue,
SchedulerCache: config.SchedulerCache,
Algorithm: config.Algorithm,
GetBinder: config.GetBinder,
PodPreemptor: config.PodPreemptor,
Framework: config.Framework,
NextPod: config.NextPod,
WaitForCacheSync: config.WaitForCacheSync,
Error: config.Error,
Recorder: config.Recorder,
StopEverything: config.StopEverything,
VolumeBinder: config.VolumeBinder,
DisablePreemption: config.DisablePreemption,
SchedulingQueue: config.SchedulingQueue,
}
}
@ -324,7 +330,7 @@ func (sched *Scheduler) Run() {
func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) {
sched.Error(pod, err)
sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message)
if err := sched.PodConditionUpdater.Update(pod, &v1.PodCondition{
if err := sched.podConditionUpdater.update(pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
@ -670,6 +676,19 @@ func (sched *Scheduler) scheduleOne() {
}()
}
type podConditionUpdaterImpl struct {
Client clientset.Interface
}
func (p *podConditionUpdaterImpl) update(pod *v1.Pod, condition *v1.PodCondition) error {
klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason)
if podutil.UpdatePodCondition(&pod.Status, condition) {
_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
return err
}
return nil
}
// nodeResourceString returns a string representation of node resources.
func nodeResourceString(n *v1.Node) string {
if n == nil {

View File

@ -74,7 +74,7 @@ func (fb fakeBinder) Bind(binding *v1.Binding) error { return fb.b(binding) }
type fakePodConditionUpdater struct{}
func (fc fakePodConditionUpdater) Update(pod *v1.Pod, podCondition *v1.PodCondition) error {
func (fc fakePodConditionUpdater) update(pod *v1.Pod, podCondition *v1.PodCondition) error {
return nil
}
@ -286,7 +286,7 @@ func TestScheduler(t *testing.T) {
},
}
s := NewFromConfig(&factory.Config{
s := &Scheduler{
SchedulerCache: sCache,
Algorithm: item.algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
@ -295,7 +295,7 @@ func TestScheduler(t *testing.T) {
return item.injectBindError
}}
},
PodConditionUpdater: fakePodConditionUpdater{},
podConditionUpdater: fakePodConditionUpdater{},
Error: func(p *v1.Pod, err error) {
gotPod = p
gotError = err
@ -306,7 +306,7 @@ func TestScheduler(t *testing.T) {
Framework: emptyFramework,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
})
}
called := make(chan struct{})
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
e, _ := obj.(*v1beta1.Event)
@ -670,7 +670,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
bindingChan := make(chan *v1.Binding, 1)
errChan := make(chan error, 1)
config := &factory.Config{
sched := &Scheduler{
SchedulerCache: scache,
Algorithm: algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
@ -686,18 +686,16 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
errChan <- err
},
Recorder: &events.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
podConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
Framework: emptyFramework,
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
}
if recorder != nil {
config.Recorder = recorder
sched.Recorder = recorder
}
sched := NewFromConfig(config)
return sched, bindingChan, errChan
}
@ -722,7 +720,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
)
bindingChan := make(chan *v1.Binding, 2)
sched := NewFromConfig(&factory.Config{
sched := &Scheduler{
SchedulerCache: scache,
Algorithm: algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
@ -742,12 +740,12 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
queuedPodStore.AddIfNotPresent(p)
},
Recorder: &events.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
podConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
StopEverything: stop,
Framework: framework,
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
})
}
return sched, bindingChan
}