From 9a5d058e5e30453eac235d7d1a5fa3ad08006f67 Mon Sep 17 00:00:00 2001 From: "Bobby (Babak) Salamat" Date: Mon, 9 Oct 2017 15:37:03 -0700 Subject: [PATCH 1/2] Add PodDisruptionBudget to scheduler cache. --- plugin/cmd/kube-scheduler/app/configurator.go | 3 + plugin/cmd/kube-scheduler/app/server.go | 1 + .../authorizer/rbac/bootstrappolicy/policy.go | 3 +- .../testdata/cluster-roles.yaml | 8 ++ .../defaults/compatibility_test.go | 1 + plugin/pkg/scheduler/factory/factory.go | 66 +++++++++++ plugin/pkg/scheduler/factory/factory_test.go | 9 ++ plugin/pkg/scheduler/schedulercache/cache.go | 36 ++++++ .../scheduler/schedulercache/cache_test.go | 106 ++++++++++++++++++ .../pkg/scheduler/schedulercache/interface.go | 13 +++ plugin/pkg/scheduler/testing/fake_cache.go | 11 ++ test/integration/scheduler/extender_test.go | 1 + test/integration/scheduler/scheduler_test.go | 97 ++++++++++++++++ test/integration/scheduler/taint_test.go | 1 + test/integration/scheduler/util.go | 1 + test/integration/scheduler_perf/util.go | 1 + 16 files changed, 357 insertions(+), 1 deletion(-) diff --git a/plugin/cmd/kube-scheduler/app/configurator.go b/plugin/cmd/kube-scheduler/app/configurator.go index fd3de920531..567a3bfd381 100644 --- a/plugin/cmd/kube-scheduler/app/configurator.go +++ b/plugin/cmd/kube-scheduler/app/configurator.go @@ -24,6 +24,7 @@ import ( appsinformers "k8s.io/client-go/informers/apps/v1beta1" coreinformers "k8s.io/client-go/informers/core/v1" extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1" + policyinformers "k8s.io/client-go/informers/policy/v1beta1" "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -86,6 +87,7 @@ func CreateScheduler( replicaSetInformer extensionsinformers.ReplicaSetInformer, statefulSetInformer appsinformers.StatefulSetInformer, serviceInformer coreinformers.ServiceInformer, + pdbInformer policyinformers.PodDisruptionBudgetInformer, recorder record.EventRecorder, ) (*scheduler.Scheduler, error) { configurator := factory.NewConfigFactory( @@ -99,6 +101,7 @@ func CreateScheduler( replicaSetInformer, statefulSetInformer, serviceInformer, + pdbInformer, s.HardPodAffinitySymmetricWeight, utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache), ) diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index 8c4ad183445..d94ea41f96d 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -95,6 +95,7 @@ func Run(s *options.SchedulerServer) error { informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), recorder, ) if err != nil { diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index bea311c7ce9..0d488b5ab5a 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -355,7 +355,8 @@ func ClusterRoles() []rbac.ClusterRole { rbac.NewRule(Read...).Groups(legacyGroup).Resources("services", "replicationcontrollers").RuleOrDie(), rbac.NewRule(Read...).Groups(extensionsGroup).Resources("replicasets").RuleOrDie(), rbac.NewRule(Read...).Groups(appsGroup).Resources("statefulsets").RuleOrDie(), - // things that pods use + // things that pods use or applies to them + rbac.NewRule(Read...).Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(), rbac.NewRule(Read...).Groups(legacyGroup).Resources("persistentvolumeclaims", "persistentvolumes").RuleOrDie(), }, }, diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml index 4c6bf1e71a0..43018fea21c 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml @@ -659,6 +659,14 @@ items: - get - list - watch + - apiGroups: + - policy + resources: + - poddisruptionbudgets + verbs: + - get + - list + - watch - apiGroups: - "" resources: diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go b/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go index cdca930a76d..8fe4a9d68c3 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go @@ -505,6 +505,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ).CreateFromConfig(policy); err != nil { diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 0d46abb7980..5f4dfdfaf0f 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -27,6 +27,7 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" + "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -39,10 +40,12 @@ import ( appsinformers "k8s.io/client-go/informers/apps/v1beta1" coreinformers "k8s.io/client-go/informers/core/v1" extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1" + policyinformers "k8s.io/client-go/informers/policy/v1beta1" clientset "k8s.io/client-go/kubernetes" appslisters "k8s.io/client-go/listers/apps/v1beta1" corelisters "k8s.io/client-go/listers/core/v1" extensionslisters "k8s.io/client-go/listers/extensions/v1beta1" + policylisters "k8s.io/client-go/listers/policy/v1beta1" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api/helper" podutil "k8s.io/kubernetes/pkg/api/v1/pod" @@ -93,6 +96,8 @@ type configFactory struct { replicaSetLister extensionslisters.ReplicaSetLister // a means to list all statefulsets statefulSetLister appslisters.StatefulSetLister + // a means to list all PodDisruptionBudgets + pdbLister policylisters.PodDisruptionBudgetLister // Close this to stop all reflectors StopEverything chan struct{} @@ -130,6 +135,7 @@ func NewConfigFactory( replicaSetInformer extensionsinformers.ReplicaSetInformer, statefulSetInformer appsinformers.StatefulSetInformer, serviceInformer coreinformers.ServiceInformer, + pdbInformer policyinformers.PodDisruptionBudgetInformer, hardPodAffinitySymmetricWeight int, enableEquivalenceClassCache bool, ) scheduler.Configurator { @@ -146,6 +152,7 @@ func NewConfigFactory( controllerLister: replicationControllerInformer.Lister(), replicaSetLister: replicaSetInformer.Lister(), statefulSetLister: statefulSetInformer.Lister(), + pdbLister: pdbInformer.Lister(), schedulerCache: schedulerCache, StopEverything: stopEverything, schedulerName: schedulerName, @@ -220,6 +227,15 @@ func NewConfigFactory( ) c.nodeLister = nodeInformer.Lister() + pdbInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addPDBToCache, + UpdateFunc: c.updatePDBInCache, + DeleteFunc: c.deletePDBFromCache, + }, + ) + c.pdbLister = pdbInformer.Lister() + // On add and delete of PVs, it will affect equivalence cache items // related to persistent volume pvInformer.Informer().AddEventHandler( @@ -654,6 +670,56 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) { } } +func (c *configFactory) addPDBToCache(obj interface{}) { + pdb, ok := obj.(*v1beta1.PodDisruptionBudget) + if !ok { + glog.Errorf("cannot convert to *v1beta1.PodDisruptionBudget: %v", obj) + return + } + + if err := c.schedulerCache.AddPDB(pdb); err != nil { + glog.Errorf("scheduler cache AddPDB failed: %v", err) + } +} + +func (c *configFactory) updatePDBInCache(oldObj, newObj interface{}) { + oldPDB, ok := oldObj.(*v1beta1.PodDisruptionBudget) + if !ok { + glog.Errorf("cannot convert oldObj to *v1beta1.PodDisruptionBudget: %v", oldObj) + return + } + newPDB, ok := newObj.(*v1beta1.PodDisruptionBudget) + if !ok { + glog.Errorf("cannot convert newObj to *v1beta1.PodDisruptionBudget: %v", newObj) + return + } + + if err := c.schedulerCache.UpdatePDB(oldPDB, newPDB); err != nil { + glog.Errorf("scheduler cache UpdatePDB failed: %v", err) + } +} + +func (c *configFactory) deletePDBFromCache(obj interface{}) { + var pdb *v1beta1.PodDisruptionBudget + switch t := obj.(type) { + case *v1beta1.PodDisruptionBudget: + pdb = t + case cache.DeletedFinalStateUnknown: + var ok bool + pdb, ok = t.Obj.(*v1beta1.PodDisruptionBudget) + if !ok { + glog.Errorf("cannot convert to *v1beta1.PodDisruptionBudget: %v", t.Obj) + return + } + default: + glog.Errorf("cannot convert to *v1beta1.PodDisruptionBudget: %v", t) + return + } + if err := c.schedulerCache.RemovePDB(pdb); err != nil { + glog.Errorf("scheduler cache RemovePDB failed: %v", err) + } +} + // Create creates a scheduler with the default algorithm provider. func (f *configFactory) Create() (*scheduler.Config, error) { return f.CreateFromProvider(DefaultProvider) diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index f4c95c7f1e6..fcfe1d51161 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -64,6 +64,7 @@ func TestCreate(t *testing.T) { informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) @@ -96,6 +97,7 @@ func TestCreateFromConfig(t *testing.T) { informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) @@ -155,6 +157,7 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) { informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) @@ -215,6 +218,7 @@ func TestCreateFromEmptyConfig(t *testing.T) { informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) @@ -272,6 +276,7 @@ func TestDefaultErrorFunc(t *testing.T) { informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) @@ -385,6 +390,7 @@ func TestResponsibleForPod(t *testing.T) { informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) @@ -400,6 +406,7 @@ func TestResponsibleForPod(t *testing.T) { informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) @@ -470,6 +477,7 @@ func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) { informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), -1, enableEquivalenceCache, ) @@ -516,6 +524,7 @@ func TestInvalidFactoryArgs(t *testing.T) { informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), test.hardPodAffinitySymmetricWeight, enableEquivalenceCache, ) diff --git a/plugin/pkg/scheduler/schedulercache/cache.go b/plugin/pkg/scheduler/schedulercache/cache.go index 9b2dfd7ea77..e37ef233a67 100644 --- a/plugin/pkg/scheduler/schedulercache/cache.go +++ b/plugin/pkg/scheduler/schedulercache/cache.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "github.com/golang/glog" + policy "k8s.io/api/policy/v1beta1" ) var ( @@ -55,6 +56,7 @@ type schedulerCache struct { // a map from pod key to podState. podStates map[string]*podState nodes map[string]*NodeInfo + pdbs map[string]*policy.PodDisruptionBudget } type podState struct { @@ -74,6 +76,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul nodes: make(map[string]*NodeInfo), assumedPods: make(map[string]bool), podStates: make(map[string]*podState), + pdbs: make(map[string]*policy.PodDisruptionBudget), } } @@ -382,6 +385,39 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error { return nil } +func (cache *schedulerCache) AddPDB(pdb *policy.PodDisruptionBudget) error { + cache.mu.Lock() + defer cache.mu.Unlock() + + // Unconditionally update cache. + cache.pdbs[pdb.Name] = pdb + return nil +} + +func (cache *schedulerCache) UpdatePDB(oldPDB, newPDB *policy.PodDisruptionBudget) error { + return cache.AddPDB(newPDB) +} + +func (cache *schedulerCache) RemovePDB(pdb *policy.PodDisruptionBudget) error { + cache.mu.Lock() + defer cache.mu.Unlock() + + delete(cache.pdbs, pdb.Name) + return nil +} + +func (cache *schedulerCache) ListPDBs(selector labels.Selector) ([]*policy.PodDisruptionBudget, error) { + cache.mu.Lock() + defer cache.mu.Unlock() + var pdbs []*policy.PodDisruptionBudget + for _, pdb := range cache.pdbs { + if selector.Matches(labels.Set(pdb.Labels)) { + pdbs = append(pdbs, pdb) + } + } + return pdbs, nil +} + func (cache *schedulerCache) run() { go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop) } diff --git a/plugin/pkg/scheduler/schedulercache/cache_test.go b/plugin/pkg/scheduler/schedulercache/cache_test.go index f27c7b5ef7b..45d44a09be4 100644 --- a/plugin/pkg/scheduler/schedulercache/cache_test.go +++ b/plugin/pkg/scheduler/schedulercache/cache_test.go @@ -24,9 +24,11 @@ import ( "time" "k8s.io/api/core/v1" + "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" v1helper "k8s.io/kubernetes/pkg/api/v1/helper" priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" schedutil "k8s.io/kubernetes/plugin/pkg/scheduler/util" @@ -900,3 +902,107 @@ func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) } return cache } + +func makePDB(name, namespace string, labels map[string]string, minAvailable int) *v1beta1.PodDisruptionBudget { + intstrMin := intstr.FromInt(minAvailable) + pdb := &v1beta1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + Labels: labels, + }, + Spec: v1beta1.PodDisruptionBudgetSpec{ + MinAvailable: &intstrMin, + Selector: &metav1.LabelSelector{MatchLabels: labels}, + }, + } + + return pdb +} + +// TestPDBOperations tests that a PDB will be add/updated/deleted correctly. +func TestPDBOperations(t *testing.T) { + ttl := 10 * time.Second + testPDBs := []*v1beta1.PodDisruptionBudget{ + makePDB("pdb0", "ns1", map[string]string{"tkey1": "tval1"}, 3), + makePDB("pdb1", "ns1", map[string]string{"tkey1": "tval1", "tkey2": "tval2"}, 1), + makePDB("pdb2", "ns3", map[string]string{"tkey3": "tval3", "tkey2": "tval2"}, 10), + } + updatedPDBs := []*v1beta1.PodDisruptionBudget{ + makePDB("pdb0", "ns1", map[string]string{"tkey4": "tval4"}, 8), + makePDB("pdb1", "ns1", map[string]string{"tkey1": "tval1"}, 1), + makePDB("pdb2", "ns3", map[string]string{"tkey3": "tval3", "tkey1": "tval1", "tkey2": "tval2"}, 10), + } + tests := []struct { + pdbsToAdd []*v1beta1.PodDisruptionBudget + pdbsToUpdate []*v1beta1.PodDisruptionBudget + pdbsToDelete []*v1beta1.PodDisruptionBudget + expectedPDBs []*v1beta1.PodDisruptionBudget // Expected PDBs after all operations + }{ + { + pdbsToAdd: []*v1beta1.PodDisruptionBudget{testPDBs[0]}, + pdbsToUpdate: []*v1beta1.PodDisruptionBudget{testPDBs[0], testPDBs[1], testPDBs[0]}, + expectedPDBs: []*v1beta1.PodDisruptionBudget{testPDBs[0], testPDBs[1]}, // both will be in the cache as they have different names + }, + { + pdbsToAdd: []*v1beta1.PodDisruptionBudget{testPDBs[0]}, + pdbsToUpdate: []*v1beta1.PodDisruptionBudget{testPDBs[0], updatedPDBs[0]}, + expectedPDBs: []*v1beta1.PodDisruptionBudget{updatedPDBs[0]}, + }, + { + pdbsToAdd: []*v1beta1.PodDisruptionBudget{testPDBs[0], testPDBs[2]}, + pdbsToUpdate: []*v1beta1.PodDisruptionBudget{testPDBs[0], updatedPDBs[0]}, + pdbsToDelete: []*v1beta1.PodDisruptionBudget{testPDBs[0]}, + expectedPDBs: []*v1beta1.PodDisruptionBudget{testPDBs[2]}, + }, + } + + for _, test := range tests { + cache := newSchedulerCache(ttl, time.Second, nil) + for _, pdbToAdd := range test.pdbsToAdd { + if err := cache.AddPDB(pdbToAdd); err != nil { + t.Fatalf("AddPDB failed: %v", err) + } + } + + for i := range test.pdbsToUpdate { + if i == 0 { + continue + } + if err := cache.UpdatePDB(test.pdbsToUpdate[i-1], test.pdbsToUpdate[i]); err != nil { + t.Fatalf("UpdatePDB failed: %v", err) + } + } + + for _, pdb := range test.pdbsToDelete { + if err := cache.RemovePDB(pdb); err != nil { + t.Fatalf("RemovePDB failed: %v", err) + } + } + + cachedPDBs, err := cache.ListPDBs(labels.Everything()) + if err != nil { + t.Fatalf("ListPDBs failed: %v", err) + } + if len(cachedPDBs) != len(test.expectedPDBs) { + t.Errorf("Expected %d PDBs, got %d", len(test.expectedPDBs), len(cachedPDBs)) + } + for _, pdb := range test.expectedPDBs { + found := false + // find it among the cached ones + for _, cpdb := range cachedPDBs { + if pdb.Name == cpdb.Name { + found = true + if !reflect.DeepEqual(pdb, cpdb) { + t.Errorf("%v is not equal to %v", pdb, cpdb) + } + break + } + } + if !found { + t.Errorf("PDB with name '%v' was not found in the cache.", pdb.Name) + } + + } + } +} diff --git a/plugin/pkg/scheduler/schedulercache/interface.go b/plugin/pkg/scheduler/schedulercache/interface.go index a1ef77ea4fa..d8c09fb7e06 100644 --- a/plugin/pkg/scheduler/schedulercache/interface.go +++ b/plugin/pkg/scheduler/schedulercache/interface.go @@ -18,6 +18,7 @@ package schedulercache import ( "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/labels" ) @@ -95,6 +96,18 @@ type Cache interface { // RemoveNode removes overall information about node. RemoveNode(node *v1.Node) error + // AddPDB adds a PodDisruptionBudget object to the cache. + AddPDB(pdb *policy.PodDisruptionBudget) error + + // UpdatePDB updates a PodDisruptionBudget object in the cache. + UpdatePDB(oldPDB, newPDB *policy.PodDisruptionBudget) error + + // RemovePDB removes a PodDisruptionBudget object from the cache. + RemovePDB(pdb *policy.PodDisruptionBudget) error + + // List lists all cached PDBs matching the selector. + ListPDBs(selector labels.Selector) ([]*policy.PodDisruptionBudget, error) + // UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache. // The node info contains aggregated information of pods scheduled (including assumed to be) // on this node. diff --git a/plugin/pkg/scheduler/testing/fake_cache.go b/plugin/pkg/scheduler/testing/fake_cache.go index f0de231e579..feeb048e52b 100644 --- a/plugin/pkg/scheduler/testing/fake_cache.go +++ b/plugin/pkg/scheduler/testing/fake_cache.go @@ -18,6 +18,7 @@ package testing import ( "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) @@ -66,6 +67,16 @@ func (f *FakeCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.N return nil } +func (f *FakeCache) AddPDB(pdb *policy.PodDisruptionBudget) error { return nil } + +func (f *FakeCache) UpdatePDB(oldPDB, newPDB *policy.PodDisruptionBudget) error { return nil } + +func (f *FakeCache) RemovePDB(pdb *policy.PodDisruptionBudget) error { return nil } + +func (f *FakeCache) ListPDBs(selector labels.Selector) ([]*policy.PodDisruptionBudget, error) { + return nil, nil +} + func (f *FakeCache) List(s labels.Selector) ([]*v1.Pod, error) { return nil, nil } func (f *FakeCache) FilteredList(filter schedulercache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { diff --git a/test/integration/scheduler/extender_test.go b/test/integration/scheduler/extender_test.go index f43cc8eaeff..0cd189ae45d 100644 --- a/test/integration/scheduler/extender_test.go +++ b/test/integration/scheduler/extender_test.go @@ -368,6 +368,7 @@ func TestSchedulerExtender(t *testing.T) { informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 46ad59a66b6..80c68a4e28c 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -20,13 +20,18 @@ package scheduler import ( "fmt" + "reflect" "testing" "time" "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" @@ -134,6 +139,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}), ) if err != nil { @@ -186,6 +192,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) { informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}), ) @@ -224,6 +231,7 @@ func TestSchedulerCreationInLegacyMode(t *testing.T) { informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}), ) if err != nil { @@ -509,6 +517,7 @@ func TestMultiScheduler(t *testing.T) { informerFactory2.Extensions().V1beta1().ReplicaSets(), informerFactory2.Apps().V1beta1().StatefulSets(), informerFactory2.Core().V1().Services(), + informerFactory2.Policy().V1beta1().PodDisruptionBudgets(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) @@ -900,3 +909,91 @@ func TestPreemption(t *testing.T) { } } } + +// TestPDBCache verifies that scheduler cache works as expected when handling +// PodDisruptionBudget. +func TestPDBCache(t *testing.T) { + context := initTest(t, "pdbcache") + defer cleanupTest(t, context) + + intstrMin := intstr.FromInt(4) + pdb := &policy.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: context.ns.Name, + Name: "test-pdb", + Labels: map[string]string{"tkey1": "tval1", "tkey2": "tval2"}, + }, + Spec: policy.PodDisruptionBudgetSpec{ + MinAvailable: &intstrMin, + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"tkey": "tvalue"}}, + }, + } + + createdPDB, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Create(pdb) + if err != nil { + t.Errorf("Failed to create PDB: %v", err) + } + // Wait for PDB to show up in the scheduler's cache. + if err = wait.Poll(time.Second, 15*time.Second, func() (bool, error) { + cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything()) + if err != nil { + t.Errorf("Error while polling for PDB: %v", err) + return false, err + } + return len(cachedPDBs) > 0, err + }); err != nil { + t.Fatalf("No PDB was added to the cache: %v", err) + } + // Read PDB from the cache and compare it. + cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything()) + if len(cachedPDBs) != 1 { + t.Fatalf("Expected to have 1 pdb in cache, but found %d.", len(cachedPDBs)) + } + if !reflect.DeepEqual(createdPDB, cachedPDBs[0]) { + t.Errorf("Got different PDB than expected.\nDifference detected on:\n%s", diff.ObjectReflectDiff(createdPDB, cachedPDBs[0])) + } + + // Update PDB and change its labels. + pdbCopy := *cachedPDBs[0] + pdbCopy.Labels = map[string]string{} + updatedPDB, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Update(&pdbCopy) + if err != nil { + t.Errorf("Failed to update PDB: %v", err) + } + // Wait for PDB to be updated in the scheduler's cache. + if err = wait.Poll(time.Second, 15*time.Second, func() (bool, error) { + cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything()) + if err != nil { + t.Errorf("Error while polling for PDB: %v", err) + return false, err + } + return len(cachedPDBs[0].Labels) == 0, err + }); err != nil { + t.Fatalf("No PDB was updated in the cache: %v", err) + } + // Read PDB from the cache and compare it. + cachedPDBs, err = context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything()) + if len(cachedPDBs) != 1 { + t.Errorf("Expected to have 1 pdb in cache, but found %d.", len(cachedPDBs)) + } + if !reflect.DeepEqual(updatedPDB, cachedPDBs[0]) { + t.Errorf("Got different PDB than expected.\nDifference detected on:\n%s", diff.ObjectReflectDiff(updatedPDB, cachedPDBs[0])) + } + + // Delete PDB. + err = context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Delete(pdb.Name, &metav1.DeleteOptions{}) + if err != nil { + t.Errorf("Failed to delete PDB: %v", err) + } + // Wait for PDB to be deleted from the scheduler's cache. + if err = wait.Poll(time.Second, 15*time.Second, func() (bool, error) { + cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything()) + if err != nil { + t.Errorf("Error while polling for PDB: %v", err) + return false, err + } + return len(cachedPDBs) == 0, err + }); err != nil { + t.Errorf("No PDB was deleted from the cache: %v", err) + } +} diff --git a/test/integration/scheduler/taint_test.go b/test/integration/scheduler/taint_test.go index c91b5323a96..b0ad30457eb 100644 --- a/test/integration/scheduler/taint_test.go +++ b/test/integration/scheduler/taint_test.go @@ -130,6 +130,7 @@ func TestTaintNodeByCondition(t *testing.T) { informers.Extensions().V1beta1().ReplicaSets(), informers.Apps().V1beta1().StatefulSets(), informers.Core().V1().Services(), + informers.Policy().V1beta1().PodDisruptionBudgets(), v1.DefaultHardPodAffinitySymmetricWeight, true, // Enable EqualCache by default. ) diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 8e9ba588a95..ea713e16f4f 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -76,6 +76,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext { context.informerFactory.Extensions().V1beta1().ReplicaSets(), context.informerFactory.Apps().V1beta1().StatefulSets(), context.informerFactory.Core().V1().Services(), + context.informerFactory.Policy().V1beta1().PodDisruptionBudgets(), v1.DefaultHardPodAffinitySymmetricWeight, true, ) diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index 51f593cc86a..3fc29da0ea6 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -74,6 +74,7 @@ func mustSetupScheduler() (schedulerConfigurator scheduler.Configurator, destroy informerFactory.Extensions().V1beta1().ReplicaSets(), informerFactory.Apps().V1beta1().StatefulSets(), informerFactory.Core().V1().Services(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) From efc151f46b3ffd8b431f1336a33f99884b57244e Mon Sep 17 00:00:00 2001 From: "Bobby (Babak) Salamat" Date: Fri, 13 Oct 2017 14:08:18 -0700 Subject: [PATCH 2/2] Autogenerated files --- plugin/cmd/kube-scheduler/app/BUILD | 1 + plugin/pkg/scheduler/factory/BUILD | 3 +++ plugin/pkg/scheduler/schedulercache/BUILD | 3 +++ plugin/pkg/scheduler/testing/BUILD | 1 + test/integration/scheduler/BUILD | 4 ++++ 5 files changed, 12 insertions(+) diff --git a/plugin/cmd/kube-scheduler/app/BUILD b/plugin/cmd/kube-scheduler/app/BUILD index d3e3e8d6386..41cb2b323ea 100644 --- a/plugin/cmd/kube-scheduler/app/BUILD +++ b/plugin/cmd/kube-scheduler/app/BUILD @@ -39,6 +39,7 @@ go_library( "//vendor/k8s.io/client-go/informers/apps/v1beta1:go_default_library", "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", "//vendor/k8s.io/client-go/informers/extensions/v1beta1:go_default_library", + "//vendor/k8s.io/client-go/informers/policy/v1beta1:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", diff --git a/plugin/pkg/scheduler/factory/BUILD b/plugin/pkg/scheduler/factory/BUILD index 8a65781a77c..a3b970ef2fa 100644 --- a/plugin/pkg/scheduler/factory/BUILD +++ b/plugin/pkg/scheduler/factory/BUILD @@ -28,6 +28,7 @@ go_library( "//plugin/pkg/scheduler/util:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/api/policy/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", @@ -40,10 +41,12 @@ go_library( "//vendor/k8s.io/client-go/informers/apps/v1beta1:go_default_library", "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", "//vendor/k8s.io/client-go/informers/extensions/v1beta1:go_default_library", + "//vendor/k8s.io/client-go/informers/policy/v1beta1:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/listers/apps/v1beta1:go_default_library", "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", "//vendor/k8s.io/client-go/listers/extensions/v1beta1:go_default_library", + "//vendor/k8s.io/client-go/listers/policy/v1beta1:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", ], ) diff --git a/plugin/pkg/scheduler/schedulercache/BUILD b/plugin/pkg/scheduler/schedulercache/BUILD index 824e11cd29b..e75a4170887 100644 --- a/plugin/pkg/scheduler/schedulercache/BUILD +++ b/plugin/pkg/scheduler/schedulercache/BUILD @@ -21,6 +21,7 @@ go_library( "//plugin/pkg/scheduler/util:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/api/policy/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", @@ -38,9 +39,11 @@ go_test( "//plugin/pkg/scheduler/algorithm/priorities/util:go_default_library", "//plugin/pkg/scheduler/util:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/api/policy/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", ], ) diff --git a/plugin/pkg/scheduler/testing/BUILD b/plugin/pkg/scheduler/testing/BUILD index 804d32d330b..03ab8639c1e 100644 --- a/plugin/pkg/scheduler/testing/BUILD +++ b/plugin/pkg/scheduler/testing/BUILD @@ -19,6 +19,7 @@ go_library( "//vendor/k8s.io/api/apps/v1beta1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", + "//vendor/k8s.io/api/policy/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", ], diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index d96815a97c6..800d66541a8 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -44,9 +44,13 @@ go_test( "//test/integration/framework:go_default_library", "//test/utils:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/api/policy/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library",