Scheduler volume cache plumbing and predicate invalidation

This commit is contained in:
Michelle Au 2017-11-08 13:09:53 -08:00
parent fa6b62fa63
commit 01a8772111
17 changed files with 226 additions and 18 deletions

View File

@ -40,6 +40,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
@ -625,6 +626,11 @@ func (s *SchedulerServer) Run(stop chan struct{}) error {
// SchedulerConfig creates the scheduler configuration. This is exposed for use
// by tests.
func (s *SchedulerServer) SchedulerConfig() (*scheduler.Config, error) {
var storageClassInformer storageinformers.StorageClassInformer
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
storageClassInformer = s.InformerFactory.Storage().V1().StorageClasses()
}
// Set up the configurator which can create schedulers from configs.
configurator := factory.NewConfigFactory(
s.SchedulerName,
@ -638,6 +644,7 @@ func (s *SchedulerServer) SchedulerConfig() (*scheduler.Config, error) {
s.InformerFactory.Apps().V1beta1().StatefulSets(),
s.InformerFactory.Core().V1().Services(),
s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
storageClassInformer,
s.HardPodAffinitySymmetricWeight,
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
)

View File

@ -439,6 +439,18 @@ func ClusterRoles() []rbac.ClusterRole {
})
}
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// Find the scheduler role
for i, role := range roles {
if role.Name == "system:kube-scheduler" {
pvRule := rbac.NewRule("update").Groups(legacyGroup).Resources("persistentvolumes").RuleOrDie()
scRule := rbac.NewRule(Read...).Groups(storageGroup).Resources("storageclasses").RuleOrDie()
roles[i].Rules = append(role.Rules, pvRule, scRule)
break
}
}
}
addClusterRoleLabel(roles)
return roles
}

View File

@ -506,6 +506,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
).CreateFromConfig(policy); err != nil {

View File

@ -317,7 +317,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
}
queue := NewSchedulingQueue()
scheduler := NewGenericScheduler(
cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders)
cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders, nil)
podIgnored := &v1.Pod{}
machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr {

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
"github.com/golang/glog"
"k8s.io/kubernetes/plugin/pkg/scheduler/volumebinder"
)
type FailedPredicateMap map[string][]algorithm.PredicateFailureReason
@ -91,6 +92,7 @@ type genericScheduler struct {
lastNodeIndex uint64
cachedNodeInfoMap map[string]*schedulercache.NodeInfo
volumeBinder *volumebinder.VolumeBinder
}
// Schedule tries to schedule the given pod to one of node in the node list.
@ -867,7 +869,10 @@ func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicat
predicates.ErrNodeNotReady,
predicates.ErrNodeNetworkUnavailable,
predicates.ErrNodeUnschedulable,
predicates.ErrNodeUnknownCondition:
predicates.ErrNodeUnknownCondition,
predicates.ErrVolumeZoneConflict,
predicates.ErrVolumeNodeConflict,
predicates.ErrVolumeBindConflict:
unresolvableReasonExist = true
break
// TODO(bsalamat): Please add affinity failure cases once we have specific affinity failure errors.
@ -909,7 +914,8 @@ func NewGenericScheduler(
predicateMetaProducer algorithm.PredicateMetadataProducer,
prioritizers []algorithm.PriorityConfig,
priorityMetaProducer algorithm.MetadataProducer,
extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm {
extenders []algorithm.SchedulerExtender,
volumeBinder *volumebinder.VolumeBinder) algorithm.ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
equivalenceCache: eCache,
@ -920,5 +926,6 @@ func NewGenericScheduler(
priorityMetaProducer: priorityMetaProducer,
extenders: extenders,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
volumeBinder: volumeBinder,
}
}

View File

@ -311,7 +311,7 @@ func TestGenericScheduler(t *testing.T) {
}
scheduler := NewGenericScheduler(
cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{})
cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}, nil)
machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if !reflect.DeepEqual(err, test.wErr) {
@ -1190,7 +1190,7 @@ func TestPreempt(t *testing.T) {
extenders = append(extenders, extender)
}
scheduler := NewGenericScheduler(
cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders)
cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders, nil)
// Call Preempt and check the expected results.
node, victims, _, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
if err != nil {

View File

@ -37,18 +37,22 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
appsinformers "k8s.io/client-go/informers/apps/v1beta1"
coreinformers "k8s.io/client-go/informers/core/v1"
extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1"
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
appslisters "k8s.io/client-go/listers/apps/v1beta1"
corelisters "k8s.io/client-go/listers/core/v1"
extensionslisters "k8s.io/client-go/listers/extensions/v1beta1"
policylisters "k8s.io/client-go/listers/policy/v1beta1"
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/cache"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/helper"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/plugin/pkg/scheduler"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
@ -58,6 +62,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/core"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/volumebinder"
)
const (
@ -98,6 +103,8 @@ type configFactory struct {
statefulSetLister appslisters.StatefulSetLister
// a means to list all PodDisruptionBudgets
pdbLister policylisters.PodDisruptionBudgetLister
// a means to list all StorageClasses
storageClassLister storagelisters.StorageClassLister
// Close this to stop all reflectors
StopEverything chan struct{}
@ -120,6 +127,9 @@ type configFactory struct {
// Enable equivalence class cache
enableEquivalenceClassCache bool
// Handles volume binding decisions
volumeBinder *volumebinder.VolumeBinder
}
// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
@ -136,12 +146,19 @@ func NewConfigFactory(
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
storageClassInformer storageinformers.StorageClassInformer,
hardPodAffinitySymmetricWeight int32,
enableEquivalenceClassCache bool,
) scheduler.Configurator {
stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
// storageClassInformer is only enabled through VolumeScheduling feature gate
var storageClassLister storagelisters.StorageClassLister
if storageClassInformer != nil {
storageClassLister = storageClassInformer.Lister()
}
c := &configFactory{
client: client,
podLister: schedulerCache,
@ -153,6 +170,7 @@ func NewConfigFactory(
replicaSetLister: replicaSetInformer.Lister(),
statefulSetLister: statefulSetInformer.Lister(),
pdbLister: pdbInformer.Lister(),
storageClassLister: storageClassLister,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
schedulerName: schedulerName,
@ -208,9 +226,14 @@ func NewConfigFactory(
}
},
DeleteFunc: func(obj interface{}) {
if err := c.podQueue.Delete(obj.(*v1.Pod)); err != nil {
pod := obj.(*v1.Pod)
if err := c.podQueue.Delete(pod); err != nil {
runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
}
if c.volumeBinder != nil {
// Volume binder only wants to keep unassigned pods
c.volumeBinder.DeletePodBindings(pod)
}
},
},
},
@ -252,6 +275,7 @@ func NewConfigFactory(
pvcInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onPvcAdd,
UpdateFunc: c.onPvcUpdate,
DeleteFunc: c.onPvcDelete,
},
)
@ -272,6 +296,11 @@ func NewConfigFactory(
// Existing equivalence cache should not be affected by add/delete RC/Deployment etc,
// it only make sense when pod is scheduled or deleted
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// Setup volume binder
c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, nodeInformer, storageClassInformer)
}
return c
}
@ -365,6 +394,12 @@ func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
if pv.Spec.AzureDisk != nil {
invalidPredicates.Insert("MaxAzureDiskVolumeCount")
}
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// Add/delete impacts the available PVs to choose from
invalidPredicates.Insert(predicates.CheckVolumeBinding)
}
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}
@ -380,6 +415,27 @@ func (c *configFactory) onPvcAdd(obj interface{}) {
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) onPvcUpdate(old, new interface{}) {
if !utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
return
}
if c.enableEquivalenceClassCache {
newPVC, ok := new.(*v1.PersistentVolumeClaim)
if !ok {
glog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", new)
return
}
oldPVC, ok := old.(*v1.PersistentVolumeClaim)
if !ok {
glog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", old)
return
}
c.invalidatePredicatesForPvcUpdate(oldPVC, newPVC)
}
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) onPvcDelete(obj interface{}) {
if c.enableEquivalenceClassCache {
var pvc *v1.PersistentVolumeClaim
@ -407,6 +463,21 @@ func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim
}
}
func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) {
invalidPredicates := sets.NewString()
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
if old.Spec.VolumeName != new.Spec.VolumeName {
// PVC volume binding has changed
invalidPredicates.Insert(predicates.CheckVolumeBinding)
}
}
if invalidPredicates.Len() > 0 {
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}
}
func (c *configFactory) onServiceAdd(obj interface{}) {
if c.enableEquivalenceClassCache {
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
@ -468,6 +539,7 @@ func (c *configFactory) addPodToCache(obj interface{}) {
}
c.podQueue.AssignedPodAdded(pod)
// NOTE: Updating equivalence cache of addPodToCache has been
// handled optimistically in InvalidateCachedPredicateItemForPodAdd.
}
@ -830,7 +902,8 @@ func (f *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
f.equivalencePodCache = core.NewEquivalenceCache(getEquivalencePodFunc)
glog.Info("Created equivalence class cache")
}
algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, f.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, f.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders, f.volumeBinder)
podBackoff := util.CreateDefaultPodBackoff()
return &scheduler.Config{
@ -850,6 +923,7 @@ func (f *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
},
Error: f.MakeDefaultErrorFunc(podBackoff, f.podQueue),
StopEverything: f.StopEverything,
VolumeBinder: f.volumeBinder,
}, nil
}
@ -898,15 +972,17 @@ func (f *configFactory) GetPredicates(predicateKeys sets.String) (map[string]alg
func (f *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
return &PluginFactoryArgs{
PodLister: f.podLister,
ServiceLister: f.serviceLister,
ControllerLister: f.controllerLister,
ReplicaSetLister: f.replicaSetLister,
StatefulSetLister: f.statefulSetLister,
NodeLister: &nodeLister{f.nodeLister},
NodeInfo: &predicates.CachedNodeInfo{NodeLister: f.nodeLister},
PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: f.pVLister},
PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: f.pVCLister},
PodLister: f.podLister,
ServiceLister: f.serviceLister,
ControllerLister: f.controllerLister,
ReplicaSetLister: f.replicaSetLister,
StatefulSetLister: f.statefulSetLister,
NodeLister: &nodeLister{f.nodeLister},
NodeInfo: &predicates.CachedNodeInfo{NodeLister: f.nodeLister},
PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: f.pVLister},
PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: f.pVCLister},
StorageClassInfo: &predicates.CachedStorageClassInfo{StorageClassLister: f.storageClassLister},
VolumeBinder: f.volumeBinder,
HardPodAffinitySymmetricWeight: f.hardPodAffinitySymmetricWeight,
}, nil
}
@ -1047,6 +1123,7 @@ func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, pod
Namespace: pod.Namespace,
Name: pod.Name,
}
origPod := pod
// When pod priority is enabled, we would like to place an unschedulable
// pod in the unschedulable queue. This ensures that if the pod is nominated
@ -1066,11 +1143,21 @@ func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, pod
if err == nil {
if len(pod.Spec.NodeName) == 0 {
podQueue.AddUnschedulableIfNotPresent(pod)
} else {
if factory.volumeBinder != nil {
// Volume binder only wants to keep unassigned pods
factory.volumeBinder.DeletePodBindings(pod)
}
}
break
}
if errors.IsNotFound(err) {
glog.Warningf("A pod %v no longer exists", podID)
if factory.volumeBinder != nil {
// Volume binder only wants to keep unassigned pods
factory.volumeBinder.DeletePodBindings(origPod)
}
return
}
glog.Errorf("Error getting pod %v for retry: %v; retrying...", podID, err)

View File

@ -66,6 +66,7 @@ func TestCreate(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
@ -99,6 +100,7 @@ func TestCreateFromConfig(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
@ -159,6 +161,7 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
@ -220,6 +223,7 @@ func TestCreateFromEmptyConfig(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
@ -278,6 +282,7 @@ func TestDefaultErrorFunc(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
@ -388,6 +393,7 @@ func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
-1,
enableEquivalenceCache,
)
@ -435,6 +441,7 @@ func TestInvalidFactoryArgs(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
test.hardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)

View File

@ -30,6 +30,7 @@ import (
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"github.com/golang/glog"
"k8s.io/kubernetes/plugin/pkg/scheduler/volumebinder"
)
// PluginFactoryArgs are passed to all plugin factory functions.
@ -43,6 +44,8 @@ type PluginFactoryArgs struct {
NodeInfo predicates.NodeInfo
PVInfo predicates.PersistentVolumeInfo
PVCInfo predicates.PersistentVolumeClaimInfo
StorageClassInfo predicates.StorageClassInfo
VolumeBinder *volumebinder.VolumeBinder
HardPodAffinitySymmetricWeight int32
}

View File

@ -26,6 +26,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/core"
@ -129,6 +130,8 @@ type Config struct {
// Close this to shut down the scheduler.
StopEverything chan struct{}
VolumeBinder persistentvolume.SchedulerVolumeBinder
}
// NewFromConfigurator returns a new scheduler that is created entirely by the Configurator. Assumes Create() is implemented.

View File

@ -528,7 +528,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{},
algorithm.EmptyMetadataProducer,
[]algorithm.SchedulerExtender{})
[]algorithm.SchedulerExtender{},
nil)
bindingChan := make(chan *v1.Binding, 1)
errChan := make(chan error, 1)
configurator := &FakeConfigurator{
@ -566,7 +567,8 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{},
algorithm.EmptyMetadataProducer,
[]algorithm.SchedulerExtender{})
[]algorithm.SchedulerExtender{},
nil)
bindingChan := make(chan *v1.Binding, 2)
configurator := &FakeConfigurator{
Config: &Config{

View File

@ -0,0 +1,74 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumebinder
import (
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
)
// VolumeBinder sets up the volume binding library and manages
// the volume binding operations with a queue.
type VolumeBinder struct {
Binder persistentvolume.SchedulerVolumeBinder
BindQueue *workqueue.Type
}
// NewVolumeBinder sets up the volume binding library and binding queue
func NewVolumeBinder(
client clientset.Interface,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
nodeInformer coreinformers.NodeInformer,
storageClassInformer storageinformers.StorageClassInformer) *VolumeBinder {
return &VolumeBinder{
Binder: persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, nodeInformer, storageClassInformer),
BindQueue: workqueue.NewNamed("podsToBind"),
}
}
// NewFakeVolumeBinder sets up a fake volume binder and binding queue
func NewFakeVolumeBinder(config *persistentvolume.FakeVolumeBinderConfig) *VolumeBinder {
return &VolumeBinder{
Binder: persistentvolume.NewFakeVolumeBinder(config),
BindQueue: workqueue.NewNamed("podsToBind"),
}
}
// Run starts a goroutine to handle the binding queue with the given function.
func (b *VolumeBinder) Run(bindWorkFunc func(), stopCh <-chan struct{}) {
go wait.Until(bindWorkFunc, time.Second, stopCh)
<-stopCh
b.BindQueue.ShutDown()
}
// DeletePodBindings will delete the cached volume bindings for the given pod.
func (b *VolumeBinder) DeletePodBindings(pod *v1.Pod) {
cache := b.Binder.GetBindingsCache()
if cache != nil && pod != nil {
cache.DeleteBindings(pod)
}
}

View File

@ -369,6 +369,7 @@ func TestSchedulerExtender(t *testing.T) {
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)

View File

@ -469,6 +469,7 @@ func TestMultiScheduler(t *testing.T) {
informerFactory2.Apps().V1beta1().StatefulSets(),
informerFactory2.Core().V1().Services(),
informerFactory2.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory2.Storage().V1().StorageClasses(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)

View File

@ -131,6 +131,7 @@ func TestTaintNodeByCondition(t *testing.T) {
informers.Apps().V1beta1().StatefulSets(),
informers.Core().V1().Services(),
informers.Policy().V1beta1().PodDisruptionBudgets(),
informers.Storage().V1().StorageClasses(),
v1.DefaultHardPodAffinitySymmetricWeight,
true, // Enable EqualCache by default.
)

View File

@ -78,6 +78,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext {
context.informerFactory.Apps().V1beta1().StatefulSets(),
context.informerFactory.Core().V1().Services(),
context.informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
context.informerFactory.Storage().V1().StorageClasses(),
v1.DefaultHardPodAffinitySymmetricWeight,
true,
)

View File

@ -75,6 +75,7 @@ func mustSetupScheduler() (schedulerConfigurator scheduler.Configurator, destroy
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)