diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index d7f7abe2cda..93982f898e3 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -40,6 +40,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" + storageinformers "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" v1core "k8s.io/client-go/kubernetes/typed/core/v1" restclient "k8s.io/client-go/rest" @@ -625,6 +626,11 @@ func (s *SchedulerServer) Run(stop chan struct{}) error { // SchedulerConfig creates the scheduler configuration. This is exposed for use // by tests. func (s *SchedulerServer) SchedulerConfig() (*scheduler.Config, error) { + var storageClassInformer storageinformers.StorageClassInformer + if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { + storageClassInformer = s.InformerFactory.Storage().V1().StorageClasses() + } + // Set up the configurator which can create schedulers from configs. configurator := factory.NewConfigFactory( s.SchedulerName, @@ -638,6 +644,7 @@ func (s *SchedulerServer) SchedulerConfig() (*scheduler.Config, error) { s.InformerFactory.Apps().V1beta1().StatefulSets(), s.InformerFactory.Core().V1().Services(), s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(), + storageClassInformer, s.HardPodAffinitySymmetricWeight, utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache), ) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 29bd87f2a76..6fb6aefdb23 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -439,6 +439,18 @@ func ClusterRoles() []rbac.ClusterRole { }) } + if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { + // Find the scheduler role + for i, role := range roles { + if role.Name == "system:kube-scheduler" { + pvRule := rbac.NewRule("update").Groups(legacyGroup).Resources("persistentvolumes").RuleOrDie() + scRule := rbac.NewRule(Read...).Groups(storageGroup).Resources("storageclasses").RuleOrDie() + roles[i].Rules = append(role.Rules, pvRule, scRule) + break + } + } + } + addClusterRoleLabel(roles) return roles } diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go b/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go index 71283193a35..517d26033cc 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go @@ -506,6 +506,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory.Storage().V1().StorageClasses(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ).CreateFromConfig(policy); err != nil { diff --git a/plugin/pkg/scheduler/core/extender_test.go b/plugin/pkg/scheduler/core/extender_test.go index fafd2ae6bd4..143ba795dd6 100644 --- a/plugin/pkg/scheduler/core/extender_test.go +++ b/plugin/pkg/scheduler/core/extender_test.go @@ -317,7 +317,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { } queue := NewSchedulingQueue() scheduler := NewGenericScheduler( - cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders) + cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders, nil) podIgnored := &v1.Pod{} machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) if test.expectsErr { diff --git a/plugin/pkg/scheduler/core/generic_scheduler.go b/plugin/pkg/scheduler/core/generic_scheduler.go index 2c2f3e3dd6e..b50d475d82e 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler.go +++ b/plugin/pkg/scheduler/core/generic_scheduler.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/util" "github.com/golang/glog" + "k8s.io/kubernetes/plugin/pkg/scheduler/volumebinder" ) type FailedPredicateMap map[string][]algorithm.PredicateFailureReason @@ -91,6 +92,7 @@ type genericScheduler struct { lastNodeIndex uint64 cachedNodeInfoMap map[string]*schedulercache.NodeInfo + volumeBinder *volumebinder.VolumeBinder } // Schedule tries to schedule the given pod to one of node in the node list. @@ -867,7 +869,10 @@ func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicat predicates.ErrNodeNotReady, predicates.ErrNodeNetworkUnavailable, predicates.ErrNodeUnschedulable, - predicates.ErrNodeUnknownCondition: + predicates.ErrNodeUnknownCondition, + predicates.ErrVolumeZoneConflict, + predicates.ErrVolumeNodeConflict, + predicates.ErrVolumeBindConflict: unresolvableReasonExist = true break // TODO(bsalamat): Please add affinity failure cases once we have specific affinity failure errors. @@ -909,7 +914,8 @@ func NewGenericScheduler( predicateMetaProducer algorithm.PredicateMetadataProducer, prioritizers []algorithm.PriorityConfig, priorityMetaProducer algorithm.MetadataProducer, - extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm { + extenders []algorithm.SchedulerExtender, + volumeBinder *volumebinder.VolumeBinder) algorithm.ScheduleAlgorithm { return &genericScheduler{ cache: cache, equivalenceCache: eCache, @@ -920,5 +926,6 @@ func NewGenericScheduler( priorityMetaProducer: priorityMetaProducer, extenders: extenders, cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo), + volumeBinder: volumeBinder, } } diff --git a/plugin/pkg/scheduler/core/generic_scheduler_test.go b/plugin/pkg/scheduler/core/generic_scheduler_test.go index 99015676537..d5d9b341096 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/core/generic_scheduler_test.go @@ -311,7 +311,7 @@ func TestGenericScheduler(t *testing.T) { } scheduler := NewGenericScheduler( - cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}) + cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}, nil) machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) if !reflect.DeepEqual(err, test.wErr) { @@ -1190,7 +1190,7 @@ func TestPreempt(t *testing.T) { extenders = append(extenders, extender) } scheduler := NewGenericScheduler( - cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders) + cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders, nil) // Call Preempt and check the expected results. node, victims, _, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap})) if err != nil { diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index e77d4958303..764f449ccc4 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -37,18 +37,22 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" appsinformers "k8s.io/client-go/informers/apps/v1beta1" coreinformers "k8s.io/client-go/informers/core/v1" extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1" policyinformers "k8s.io/client-go/informers/policy/v1beta1" + storageinformers "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" appslisters "k8s.io/client-go/listers/apps/v1beta1" corelisters "k8s.io/client-go/listers/core/v1" extensionslisters "k8s.io/client-go/listers/extensions/v1beta1" policylisters "k8s.io/client-go/listers/policy/v1beta1" + storagelisters "k8s.io/client-go/listers/storage/v1" "k8s.io/client-go/tools/cache" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/apis/core/helper" + "k8s.io/kubernetes/pkg/features" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" @@ -58,6 +62,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/core" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/util" + "k8s.io/kubernetes/plugin/pkg/scheduler/volumebinder" ) const ( @@ -98,6 +103,8 @@ type configFactory struct { statefulSetLister appslisters.StatefulSetLister // a means to list all PodDisruptionBudgets pdbLister policylisters.PodDisruptionBudgetLister + // a means to list all StorageClasses + storageClassLister storagelisters.StorageClassLister // Close this to stop all reflectors StopEverything chan struct{} @@ -120,6 +127,9 @@ type configFactory struct { // Enable equivalence class cache enableEquivalenceClassCache bool + + // Handles volume binding decisions + volumeBinder *volumebinder.VolumeBinder } // NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only @@ -136,12 +146,19 @@ func NewConfigFactory( statefulSetInformer appsinformers.StatefulSetInformer, serviceInformer coreinformers.ServiceInformer, pdbInformer policyinformers.PodDisruptionBudgetInformer, + storageClassInformer storageinformers.StorageClassInformer, hardPodAffinitySymmetricWeight int32, enableEquivalenceClassCache bool, ) scheduler.Configurator { stopEverything := make(chan struct{}) schedulerCache := schedulercache.New(30*time.Second, stopEverything) + // storageClassInformer is only enabled through VolumeScheduling feature gate + var storageClassLister storagelisters.StorageClassLister + if storageClassInformer != nil { + storageClassLister = storageClassInformer.Lister() + } + c := &configFactory{ client: client, podLister: schedulerCache, @@ -153,6 +170,7 @@ func NewConfigFactory( replicaSetLister: replicaSetInformer.Lister(), statefulSetLister: statefulSetInformer.Lister(), pdbLister: pdbInformer.Lister(), + storageClassLister: storageClassLister, schedulerCache: schedulerCache, StopEverything: stopEverything, schedulerName: schedulerName, @@ -208,9 +226,14 @@ func NewConfigFactory( } }, DeleteFunc: func(obj interface{}) { - if err := c.podQueue.Delete(obj.(*v1.Pod)); err != nil { + pod := obj.(*v1.Pod) + if err := c.podQueue.Delete(pod); err != nil { runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err)) } + if c.volumeBinder != nil { + // Volume binder only wants to keep unassigned pods + c.volumeBinder.DeletePodBindings(pod) + } }, }, }, @@ -252,6 +275,7 @@ func NewConfigFactory( pvcInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: c.onPvcAdd, + UpdateFunc: c.onPvcUpdate, DeleteFunc: c.onPvcDelete, }, ) @@ -272,6 +296,11 @@ func NewConfigFactory( // Existing equivalence cache should not be affected by add/delete RC/Deployment etc, // it only make sense when pod is scheduled or deleted + if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { + // Setup volume binder + c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, nodeInformer, storageClassInformer) + } + return c } @@ -365,6 +394,12 @@ func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { if pv.Spec.AzureDisk != nil { invalidPredicates.Insert("MaxAzureDiskVolumeCount") } + + if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { + // Add/delete impacts the available PVs to choose from + invalidPredicates.Insert(predicates.CheckVolumeBinding) + } + c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) } @@ -380,6 +415,27 @@ func (c *configFactory) onPvcAdd(obj interface{}) { c.podQueue.MoveAllToActiveQueue() } +func (c *configFactory) onPvcUpdate(old, new interface{}) { + if !utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { + return + } + + if c.enableEquivalenceClassCache { + newPVC, ok := new.(*v1.PersistentVolumeClaim) + if !ok { + glog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", new) + return + } + oldPVC, ok := old.(*v1.PersistentVolumeClaim) + if !ok { + glog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", old) + return + } + c.invalidatePredicatesForPvcUpdate(oldPVC, newPVC) + } + c.podQueue.MoveAllToActiveQueue() +} + func (c *configFactory) onPvcDelete(obj interface{}) { if c.enableEquivalenceClassCache { var pvc *v1.PersistentVolumeClaim @@ -407,6 +463,21 @@ func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim } } +func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) { + invalidPredicates := sets.NewString() + + if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { + if old.Spec.VolumeName != new.Spec.VolumeName { + // PVC volume binding has changed + invalidPredicates.Insert(predicates.CheckVolumeBinding) + } + } + + if invalidPredicates.Len() > 0 { + c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) + } +} + func (c *configFactory) onServiceAdd(obj interface{}) { if c.enableEquivalenceClassCache { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) @@ -468,6 +539,7 @@ func (c *configFactory) addPodToCache(obj interface{}) { } c.podQueue.AssignedPodAdded(pod) + // NOTE: Updating equivalence cache of addPodToCache has been // handled optimistically in InvalidateCachedPredicateItemForPodAdd. } @@ -830,7 +902,8 @@ func (f *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, f.equivalencePodCache = core.NewEquivalenceCache(getEquivalencePodFunc) glog.Info("Created equivalence class cache") } - algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, f.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders) + + algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, f.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders, f.volumeBinder) podBackoff := util.CreateDefaultPodBackoff() return &scheduler.Config{ @@ -850,6 +923,7 @@ func (f *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, }, Error: f.MakeDefaultErrorFunc(podBackoff, f.podQueue), StopEverything: f.StopEverything, + VolumeBinder: f.volumeBinder, }, nil } @@ -898,15 +972,17 @@ func (f *configFactory) GetPredicates(predicateKeys sets.String) (map[string]alg func (f *configFactory) getPluginArgs() (*PluginFactoryArgs, error) { return &PluginFactoryArgs{ - PodLister: f.podLister, - ServiceLister: f.serviceLister, - ControllerLister: f.controllerLister, - ReplicaSetLister: f.replicaSetLister, - StatefulSetLister: f.statefulSetLister, - NodeLister: &nodeLister{f.nodeLister}, - NodeInfo: &predicates.CachedNodeInfo{NodeLister: f.nodeLister}, - PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: f.pVLister}, - PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: f.pVCLister}, + PodLister: f.podLister, + ServiceLister: f.serviceLister, + ControllerLister: f.controllerLister, + ReplicaSetLister: f.replicaSetLister, + StatefulSetLister: f.statefulSetLister, + NodeLister: &nodeLister{f.nodeLister}, + NodeInfo: &predicates.CachedNodeInfo{NodeLister: f.nodeLister}, + PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: f.pVLister}, + PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: f.pVCLister}, + StorageClassInfo: &predicates.CachedStorageClassInfo{StorageClassLister: f.storageClassLister}, + VolumeBinder: f.volumeBinder, HardPodAffinitySymmetricWeight: f.hardPodAffinitySymmetricWeight, }, nil } @@ -1047,6 +1123,7 @@ func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, pod Namespace: pod.Namespace, Name: pod.Name, } + origPod := pod // When pod priority is enabled, we would like to place an unschedulable // pod in the unschedulable queue. This ensures that if the pod is nominated @@ -1066,11 +1143,21 @@ func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, pod if err == nil { if len(pod.Spec.NodeName) == 0 { podQueue.AddUnschedulableIfNotPresent(pod) + } else { + if factory.volumeBinder != nil { + // Volume binder only wants to keep unassigned pods + factory.volumeBinder.DeletePodBindings(pod) + } } break } if errors.IsNotFound(err) { glog.Warningf("A pod %v no longer exists", podID) + + if factory.volumeBinder != nil { + // Volume binder only wants to keep unassigned pods + factory.volumeBinder.DeletePodBindings(origPod) + } return } glog.Errorf("Error getting pod %v for retry: %v; retrying...", podID, err) diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index db73370ac09..437e9d4d6a0 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -66,6 +66,7 @@ func TestCreate(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory.Storage().V1().StorageClasses(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) @@ -99,6 +100,7 @@ func TestCreateFromConfig(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory.Storage().V1().StorageClasses(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) @@ -159,6 +161,7 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory.Storage().V1().StorageClasses(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) @@ -220,6 +223,7 @@ func TestCreateFromEmptyConfig(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory.Storage().V1().StorageClasses(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) @@ -278,6 +282,7 @@ func TestDefaultErrorFunc(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory.Storage().V1().StorageClasses(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) @@ -388,6 +393,7 @@ func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory.Storage().V1().StorageClasses(), -1, enableEquivalenceCache, ) @@ -435,6 +441,7 @@ func TestInvalidFactoryArgs(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory.Storage().V1().StorageClasses(), test.hardPodAffinitySymmetricWeight, enableEquivalenceCache, ) diff --git a/plugin/pkg/scheduler/factory/plugins.go b/plugin/pkg/scheduler/factory/plugins.go index 028546a71dc..a0de0f67fda 100644 --- a/plugin/pkg/scheduler/factory/plugins.go +++ b/plugin/pkg/scheduler/factory/plugins.go @@ -30,6 +30,7 @@ import ( schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "github.com/golang/glog" + "k8s.io/kubernetes/plugin/pkg/scheduler/volumebinder" ) // PluginFactoryArgs are passed to all plugin factory functions. @@ -43,6 +44,8 @@ type PluginFactoryArgs struct { NodeInfo predicates.NodeInfo PVInfo predicates.PersistentVolumeInfo PVCInfo predicates.PersistentVolumeClaimInfo + StorageClassInfo predicates.StorageClassInfo + VolumeBinder *volumebinder.VolumeBinder HardPodAffinitySymmetricWeight int32 } diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index d9af3d67c96..2f63ad5312b 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -26,6 +26,7 @@ import ( clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/core" @@ -129,6 +130,8 @@ type Config struct { // Close this to shut down the scheduler. StopEverything chan struct{} + + VolumeBinder persistentvolume.SchedulerVolumeBinder } // NewFromConfigurator returns a new scheduler that is created entirely by the Configurator. Assumes Create() is implemented. diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index e44bf2bbe9c..3ef8f84700a 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -528,7 +528,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache. algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{}, algorithm.EmptyMetadataProducer, - []algorithm.SchedulerExtender{}) + []algorithm.SchedulerExtender{}, + nil) bindingChan := make(chan *v1.Binding, 1) errChan := make(chan error, 1) configurator := &FakeConfigurator{ @@ -566,7 +567,8 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{}, algorithm.EmptyMetadataProducer, - []algorithm.SchedulerExtender{}) + []algorithm.SchedulerExtender{}, + nil) bindingChan := make(chan *v1.Binding, 2) configurator := &FakeConfigurator{ Config: &Config{ diff --git a/plugin/pkg/scheduler/volumebinder/volume_binder.go b/plugin/pkg/scheduler/volumebinder/volume_binder.go new file mode 100644 index 00000000000..957c4e18aac --- /dev/null +++ b/plugin/pkg/scheduler/volumebinder/volume_binder.go @@ -0,0 +1,74 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volumebinder + +import ( + "time" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + storageinformers "k8s.io/client-go/informers/storage/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" +) + +// VolumeBinder sets up the volume binding library and manages +// the volume binding operations with a queue. +type VolumeBinder struct { + Binder persistentvolume.SchedulerVolumeBinder + BindQueue *workqueue.Type +} + +// NewVolumeBinder sets up the volume binding library and binding queue +func NewVolumeBinder( + client clientset.Interface, + pvcInformer coreinformers.PersistentVolumeClaimInformer, + pvInformer coreinformers.PersistentVolumeInformer, + nodeInformer coreinformers.NodeInformer, + storageClassInformer storageinformers.StorageClassInformer) *VolumeBinder { + + return &VolumeBinder{ + Binder: persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, nodeInformer, storageClassInformer), + BindQueue: workqueue.NewNamed("podsToBind"), + } +} + +// NewFakeVolumeBinder sets up a fake volume binder and binding queue +func NewFakeVolumeBinder(config *persistentvolume.FakeVolumeBinderConfig) *VolumeBinder { + return &VolumeBinder{ + Binder: persistentvolume.NewFakeVolumeBinder(config), + BindQueue: workqueue.NewNamed("podsToBind"), + } +} + +// Run starts a goroutine to handle the binding queue with the given function. +func (b *VolumeBinder) Run(bindWorkFunc func(), stopCh <-chan struct{}) { + go wait.Until(bindWorkFunc, time.Second, stopCh) + + <-stopCh + b.BindQueue.ShutDown() +} + +// DeletePodBindings will delete the cached volume bindings for the given pod. +func (b *VolumeBinder) DeletePodBindings(pod *v1.Pod) { + cache := b.Binder.GetBindingsCache() + if cache != nil && pod != nil { + cache.DeleteBindings(pod) + } +} diff --git a/test/integration/scheduler/extender_test.go b/test/integration/scheduler/extender_test.go index 0cd189ae45d..780c3a0376c 100644 --- a/test/integration/scheduler/extender_test.go +++ b/test/integration/scheduler/extender_test.go @@ -369,6 +369,7 @@ func TestSchedulerExtender(t *testing.T) { informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory.Storage().V1().StorageClasses(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 5d71637aa67..a6e851e5499 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -469,6 +469,7 @@ func TestMultiScheduler(t *testing.T) { informerFactory2.Apps().V1beta1().StatefulSets(), informerFactory2.Core().V1().Services(), informerFactory2.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory2.Storage().V1().StorageClasses(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) diff --git a/test/integration/scheduler/taint_test.go b/test/integration/scheduler/taint_test.go index b0ad30457eb..da8fc51aa02 100644 --- a/test/integration/scheduler/taint_test.go +++ b/test/integration/scheduler/taint_test.go @@ -131,6 +131,7 @@ func TestTaintNodeByCondition(t *testing.T) { informers.Apps().V1beta1().StatefulSets(), informers.Core().V1().Services(), informers.Policy().V1beta1().PodDisruptionBudgets(), + informers.Storage().V1().StorageClasses(), v1.DefaultHardPodAffinitySymmetricWeight, true, // Enable EqualCache by default. ) diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index da50d8113aa..4e66e0855cf 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -78,6 +78,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext { context.informerFactory.Apps().V1beta1().StatefulSets(), context.informerFactory.Core().V1().Services(), context.informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + context.informerFactory.Storage().V1().StorageClasses(), v1.DefaultHardPodAffinitySymmetricWeight, true, ) diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index 3fc29da0ea6..4e6a9025c9f 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -75,6 +75,7 @@ func mustSetupScheduler() (schedulerConfigurator scheduler.Configurator, destroy informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory.Storage().V1().StorageClasses(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, )