From f340f8baf800ae3e663f141c1c7f53c7504e21f4 Mon Sep 17 00:00:00 2001 From: "Bobby (Babak) Salamat" Date: Tue, 18 Sep 2018 17:05:48 -0700 Subject: [PATCH] Remove PDB and its event handlers from the scheduler cache --- pkg/scheduler/algorithm/types.go | 7 ++ pkg/scheduler/cache/cache.go | 42 ------- pkg/scheduler/cache/cache_test.go | 107 ------------------ pkg/scheduler/cache/interface.go | 14 --- pkg/scheduler/core/extender_test.go | 1 + pkg/scheduler/core/generic_scheduler.go | 5 +- pkg/scheduler/core/generic_scheduler_test.go | 11 +- pkg/scheduler/factory/cache_comparer.go | 26 ----- pkg/scheduler/factory/cache_comparer_test.go | 66 ----------- pkg/scheduler/factory/factory.go | 63 +---------- pkg/scheduler/factory/plugins.go | 1 + pkg/scheduler/scheduler_test.go | 2 + pkg/scheduler/testing/fake_cache.go | 15 --- pkg/scheduler/testing/fake_lister.go | 9 ++ test/integration/scheduler/preemption_test.go | 4 +- test/integration/scheduler/scheduler_test.go | 95 ---------------- test/integration/scheduler/util.go | 10 +- 17 files changed, 41 insertions(+), 437 deletions(-) diff --git a/pkg/scheduler/algorithm/types.go b/pkg/scheduler/algorithm/types.go index cd1535cb558..0ad777e9164 100644 --- a/pkg/scheduler/algorithm/types.go +++ b/pkg/scheduler/algorithm/types.go @@ -19,6 +19,7 @@ package algorithm import ( apps "k8s.io/api/apps/v1" "k8s.io/api/core/v1" + policyv1beta1 "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/labels" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" @@ -122,6 +123,12 @@ type ReplicaSetLister interface { GetPodReplicaSets(*v1.Pod) ([]*apps.ReplicaSet, error) } +// PDBLister interface represents anything that can list PodDisruptionBudget objects. +type PDBLister interface { + // List() returns a list of PodDisruptionBudgets matching the selector. + List(labels.Selector) ([]*policyv1beta1.PodDisruptionBudget, error) +} + var _ ControllerLister = &EmptyControllerLister{} // EmptyControllerLister implements ControllerLister on []v1.ReplicationController returning empty data diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 0be65128497..439b93596c6 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -29,7 +29,6 @@ import ( "k8s.io/kubernetes/pkg/features" "github.com/golang/glog" - policy "k8s.io/api/policy/v1beta1" ) var ( @@ -60,7 +59,6 @@ type schedulerCache struct { podStates map[string]*podState nodes map[string]*NodeInfo nodeTree *NodeTree - pdbs map[string]*policy.PodDisruptionBudget // A map from image name to its imageState. imageStates map[string]*imageState } @@ -106,7 +104,6 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul nodeTree: newNodeTree(nil), assumedPods: make(map[string]bool), podStates: make(map[string]*podState), - pdbs: make(map[string]*policy.PodDisruptionBudget), imageStates: make(map[string]*imageState), } } @@ -127,15 +124,9 @@ func (cache *schedulerCache) Snapshot() *Snapshot { assumedPods[k] = v } - pdbs := make(map[string]*policy.PodDisruptionBudget) - for k, v := range cache.pdbs { - pdbs[k] = v.DeepCopy() - } - return &Snapshot{ Nodes: nodes, AssumedPods: assumedPods, - Pdbs: pdbs, } } @@ -522,39 +513,6 @@ func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) { } } -func (cache *schedulerCache) AddPDB(pdb *policy.PodDisruptionBudget) error { - cache.mu.Lock() - defer cache.mu.Unlock() - - // Unconditionally update cache. - cache.pdbs[string(pdb.UID)] = pdb - return nil -} - -func (cache *schedulerCache) UpdatePDB(oldPDB, newPDB *policy.PodDisruptionBudget) error { - return cache.AddPDB(newPDB) -} - -func (cache *schedulerCache) RemovePDB(pdb *policy.PodDisruptionBudget) error { - cache.mu.Lock() - defer cache.mu.Unlock() - - delete(cache.pdbs, string(pdb.UID)) - return nil -} - -func (cache *schedulerCache) ListPDBs(selector labels.Selector) ([]*policy.PodDisruptionBudget, error) { - cache.mu.RLock() - defer cache.mu.RUnlock() - var pdbs []*policy.PodDisruptionBudget - for _, pdb := range cache.pdbs { - if selector.Matches(labels.Set(pdb.Labels)) { - pdbs = append(pdbs, pdb) - } - } - return pdbs, nil -} - func (cache *schedulerCache) run() { go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop) } diff --git a/pkg/scheduler/cache/cache_test.go b/pkg/scheduler/cache/cache_test.go index 01837ca5698..f78aa915de4 100644 --- a/pkg/scheduler/cache/cache_test.go +++ b/pkg/scheduler/cache/cache_test.go @@ -24,12 +24,10 @@ import ( "time" "k8s.io/api/core/v1" - "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/features" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" @@ -1230,108 +1228,3 @@ func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) } return cache } - -func makePDB(name, namespace string, uid types.UID, labels map[string]string, minAvailable int) *v1beta1.PodDisruptionBudget { - intstrMin := intstr.FromInt(minAvailable) - pdb := &v1beta1.PodDisruptionBudget{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: namespace, - Name: name, - Labels: labels, - UID: uid, - }, - Spec: v1beta1.PodDisruptionBudgetSpec{ - MinAvailable: &intstrMin, - Selector: &metav1.LabelSelector{MatchLabels: labels}, - }, - } - - return pdb -} - -// TestPDBOperations tests that a PDB will be add/updated/deleted correctly. -func TestPDBOperations(t *testing.T) { - ttl := 10 * time.Second - testPDBs := []*v1beta1.PodDisruptionBudget{ - makePDB("pdb0", "ns1", "uid0", map[string]string{"tkey1": "tval1"}, 3), - makePDB("pdb1", "ns1", "uid1", map[string]string{"tkey1": "tval1", "tkey2": "tval2"}, 1), - makePDB("pdb2", "ns3", "uid2", map[string]string{"tkey3": "tval3", "tkey2": "tval2"}, 10), - } - updatedPDBs := []*v1beta1.PodDisruptionBudget{ - makePDB("pdb0", "ns1", "uid0", map[string]string{"tkey4": "tval4"}, 8), - makePDB("pdb1", "ns1", "uid1", map[string]string{"tkey1": "tval1"}, 1), - makePDB("pdb2", "ns3", "uid2", map[string]string{"tkey3": "tval3", "tkey1": "tval1", "tkey2": "tval2"}, 10), - } - tests := []struct { - pdbsToAdd []*v1beta1.PodDisruptionBudget - pdbsToUpdate []*v1beta1.PodDisruptionBudget - pdbsToDelete []*v1beta1.PodDisruptionBudget - expectedPDBs []*v1beta1.PodDisruptionBudget // Expected PDBs after all operations - }{ - { - pdbsToAdd: []*v1beta1.PodDisruptionBudget{testPDBs[0]}, - pdbsToUpdate: []*v1beta1.PodDisruptionBudget{testPDBs[0], testPDBs[1], testPDBs[0]}, - expectedPDBs: []*v1beta1.PodDisruptionBudget{testPDBs[0], testPDBs[1]}, // both will be in the cache as they have different names - }, - { - pdbsToAdd: []*v1beta1.PodDisruptionBudget{testPDBs[0]}, - pdbsToUpdate: []*v1beta1.PodDisruptionBudget{testPDBs[0], updatedPDBs[0]}, - expectedPDBs: []*v1beta1.PodDisruptionBudget{updatedPDBs[0]}, - }, - { - pdbsToAdd: []*v1beta1.PodDisruptionBudget{testPDBs[0], testPDBs[2]}, - pdbsToUpdate: []*v1beta1.PodDisruptionBudget{testPDBs[0], updatedPDBs[0]}, - pdbsToDelete: []*v1beta1.PodDisruptionBudget{testPDBs[0]}, - expectedPDBs: []*v1beta1.PodDisruptionBudget{testPDBs[2]}, - }, - } - - for _, test := range tests { - cache := newSchedulerCache(ttl, time.Second, nil) - for _, pdbToAdd := range test.pdbsToAdd { - if err := cache.AddPDB(pdbToAdd); err != nil { - t.Fatalf("AddPDB failed: %v", err) - } - } - - for i := range test.pdbsToUpdate { - if i == 0 { - continue - } - if err := cache.UpdatePDB(test.pdbsToUpdate[i-1], test.pdbsToUpdate[i]); err != nil { - t.Fatalf("UpdatePDB failed: %v", err) - } - } - - for _, pdb := range test.pdbsToDelete { - if err := cache.RemovePDB(pdb); err != nil { - t.Fatalf("RemovePDB failed: %v", err) - } - } - - cachedPDBs, err := cache.ListPDBs(labels.Everything()) - if err != nil { - t.Fatalf("ListPDBs failed: %v", err) - } - if len(cachedPDBs) != len(test.expectedPDBs) { - t.Errorf("Expected %d PDBs, got %d", len(test.expectedPDBs), len(cachedPDBs)) - } - for _, pdb := range test.expectedPDBs { - found := false - // find it among the cached ones - for _, cpdb := range cachedPDBs { - if pdb.UID == cpdb.UID { - found = true - if !reflect.DeepEqual(pdb, cpdb) { - t.Errorf("%v is not equal to %v", pdb, cpdb) - } - break - } - } - if !found { - t.Errorf("PDB with uid '%v' was not found in the cache.", pdb.UID) - } - - } - } -} diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/cache/interface.go index 2f61d541dd4..14aa485f89b 100644 --- a/pkg/scheduler/cache/interface.go +++ b/pkg/scheduler/cache/interface.go @@ -18,7 +18,6 @@ package cache import ( "k8s.io/api/core/v1" - policy "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/labels" ) @@ -97,18 +96,6 @@ type Cache interface { // RemoveNode removes overall information about node. RemoveNode(node *v1.Node) error - // AddPDB adds a PodDisruptionBudget object to the cache. - AddPDB(pdb *policy.PodDisruptionBudget) error - - // UpdatePDB updates a PodDisruptionBudget object in the cache. - UpdatePDB(oldPDB, newPDB *policy.PodDisruptionBudget) error - - // RemovePDB removes a PodDisruptionBudget object from the cache. - RemovePDB(pdb *policy.PodDisruptionBudget) error - - // List lists all cached PDBs matching the selector. - ListPDBs(selector labels.Selector) ([]*policy.PodDisruptionBudget, error) - // UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache. // The node info contains aggregated information of pods scheduled (including assumed to be) // on this node. @@ -131,5 +118,4 @@ type Cache interface { type Snapshot struct { AssumedPods map[string]bool Nodes map[string]*NodeInfo - Pdbs map[string]*policy.PodDisruptionBudget } diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 1bbcf7cf306..5398b023607 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -513,6 +513,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, + schedulertesting.FakePDBLister{}, false, false, schedulerapi.DefaultPercentageOfNodesToScore) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index c48a5abea96..92bddc921ad 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -107,6 +107,7 @@ type genericScheduler struct { cachedNodeInfoMap map[string]*schedulercache.NodeInfo volumeBinder *volumebinder.VolumeBinder pvcLister corelisters.PersistentVolumeClaimLister + pdbLister algorithm.PDBLister disablePreemption bool percentageOfNodesToScore int32 } @@ -266,7 +267,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, // In this case, we should clean-up any existing nominated node name of the pod. return nil, nil, []*v1.Pod{pod}, nil } - pdbs, err := g.cache.ListPDBs(labels.Everything()) + pdbs, err := g.pdbLister.List(labels.Everything()) if err != nil { return nil, nil, nil, err } @@ -1146,6 +1147,7 @@ func NewGenericScheduler( extenders []algorithm.SchedulerExtender, volumeBinder *volumebinder.VolumeBinder, pvcLister corelisters.PersistentVolumeClaimLister, + pdbLister algorithm.PDBLister, alwaysCheckAllPredicates bool, disablePreemption bool, percentageOfNodesToScore int32, @@ -1162,6 +1164,7 @@ func NewGenericScheduler( cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo), volumeBinder: volumeBinder, pvcLister: pvcLister, + pdbLister: pdbLister, alwaysCheckAllPredicates: alwaysCheckAllPredicates, disablePreemption: disablePreemption, percentageOfNodesToScore: percentageOfNodesToScore, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index e050a859360..eed2460cfaa 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -425,6 +425,7 @@ func TestGenericScheduler(t *testing.T) { []algorithm.SchedulerExtender{}, nil, pvcLister, + schedulertesting.FakePDBLister{}, test.alwaysCheckAllPredicates, false, schedulerapi.DefaultPercentageOfNodesToScore) @@ -457,7 +458,7 @@ func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Nod algorithm.EmptyPredicateMetadataProducer, prioritizers, algorithm.EmptyPriorityMetadataProducer, - nil, nil, nil, false, false, + nil, nil, nil, nil, false, false, schedulerapi.DefaultPercentageOfNodesToScore) cache.UpdateNodeNameToInfoMap(s.(*genericScheduler).cachedNodeInfoMap) return s.(*genericScheduler) @@ -1381,6 +1382,7 @@ func TestPreempt(t *testing.T) { extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, + schedulertesting.FakePDBLister{}, false, false, schedulerapi.DefaultPercentageOfNodesToScore) @@ -1495,6 +1497,7 @@ func TestCacheInvalidationRace(t *testing.T) { // Set up the scheduler. prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}} pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{}) + pdbLister := schedulertesting.FakePDBLister{} scheduler := NewGenericScheduler( mockCache, eCache, @@ -1503,7 +1506,8 @@ func TestCacheInvalidationRace(t *testing.T) { algorithm.EmptyPredicateMetadataProducer, prioritizers, algorithm.EmptyPriorityMetadataProducer, - nil, nil, pvcLister, true, false, + nil, nil, pvcLister, pdbLister, + true, false, schedulerapi.DefaultPercentageOfNodesToScore) // First scheduling attempt should fail. @@ -1576,6 +1580,7 @@ func TestCacheInvalidationRace2(t *testing.T) { // Set up the scheduler. prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}} pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{}) + pdbLister := schedulertesting.FakePDBLister{} scheduler := NewGenericScheduler( cache, eCache, @@ -1584,7 +1589,7 @@ func TestCacheInvalidationRace2(t *testing.T) { algorithm.EmptyPredicateMetadataProducer, prioritizers, algorithm.EmptyPriorityMetadataProducer, - nil, nil, pvcLister, true, false, + nil, nil, pvcLister, pdbLister, true, false, schedulerapi.DefaultPercentageOfNodesToScore) // First scheduling attempt should fail. diff --git a/pkg/scheduler/factory/cache_comparer.go b/pkg/scheduler/factory/cache_comparer.go index fadd7be2c99..4768b79ef95 100644 --- a/pkg/scheduler/factory/cache_comparer.go +++ b/pkg/scheduler/factory/cache_comparer.go @@ -22,10 +22,8 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" - policy "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/labels" corelisters "k8s.io/client-go/listers/core/v1" - v1beta1 "k8s.io/client-go/listers/policy/v1beta1" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" "k8s.io/kubernetes/pkg/scheduler/core" ) @@ -33,7 +31,6 @@ import ( type cacheComparer struct { nodeLister corelisters.NodeLister podLister corelisters.PodLister - pdbLister v1beta1.PodDisruptionBudgetLister cache schedulercache.Cache podQueue core.SchedulingQueue @@ -54,11 +51,6 @@ func (c *cacheComparer) Compare() error { return err } - pdbs, err := c.pdbLister.List(labels.Everything()) - if err != nil { - return err - } - snapshot := c.cache.Snapshot() waitingPods := c.podQueue.WaitingPods() @@ -71,10 +63,6 @@ func (c *cacheComparer) Compare() error { glog.Warningf("cache mismatch: missed pods: %s; redundant pods: %s", missed, redundant) } - if missed, redundant := c.ComparePdbs(pdbs, snapshot.Pdbs); len(missed)+len(redundant) != 0 { - glog.Warningf("cache mismatch: missed pdbs: %s; redundant pdbs: %s", missed, redundant) - } - return nil } @@ -114,20 +102,6 @@ func (c compareStrategy) ComparePods(pods, waitingPods []*v1.Pod, nodeinfos map[ return compareStrings(actual, cached) } -func (c compareStrategy) ComparePdbs(pdbs []*policy.PodDisruptionBudget, pdbCache map[string]*policy.PodDisruptionBudget) (missed, redundant []string) { - actual := []string{} - for _, pdb := range pdbs { - actual = append(actual, string(pdb.UID)) - } - - cached := []string{} - for pdbUID := range pdbCache { - cached = append(cached, pdbUID) - } - - return compareStrings(actual, cached) -} - func compareStrings(actual, cached []string) (missed, redundant []string) { missed, redundant = []string{}, []string{} diff --git a/pkg/scheduler/factory/cache_comparer_test.go b/pkg/scheduler/factory/cache_comparer_test.go index 2a4ed53df4e..859c53b391c 100644 --- a/pkg/scheduler/factory/cache_comparer_test.go +++ b/pkg/scheduler/factory/cache_comparer_test.go @@ -21,7 +21,6 @@ import ( "testing" "k8s.io/api/core/v1" - policy "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/types" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" ) @@ -191,68 +190,3 @@ func testComparePods(actual, cached, queued, missing, redundant []string, t *tes t.Errorf("redundant expected to be %s; got %s", redundant, r) } } - -func TestComparePdbs(t *testing.T) { - tests := []struct { - name string - actual []string - cached []string - missing []string - redundant []string - }{ - { - name: "redundant cache value", - actual: []string{"foo", "bar"}, - cached: []string{"bar", "foo", "foobar"}, - missing: []string{}, - redundant: []string{"foobar"}, - }, - { - name: "missing cache value", - actual: []string{"foo", "bar", "foobar"}, - cached: []string{"bar", "foo"}, - missing: []string{"foobar"}, - redundant: []string{}, - }, - { - name: "correct cache", - actual: []string{"foo", "bar", "foobar"}, - cached: []string{"bar", "foobar", "foo"}, - missing: []string{}, - redundant: []string{}, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - testComparePdbs(test.actual, test.cached, test.missing, test.redundant, t) - }) - } -} - -func testComparePdbs(actual, cached, missing, redundant []string, t *testing.T) { - compare := compareStrategy{} - pdbs := []*policy.PodDisruptionBudget{} - for _, uid := range actual { - pdb := &policy.PodDisruptionBudget{} - pdb.UID = types.UID(uid) - pdbs = append(pdbs, pdb) - } - - cache := make(map[string]*policy.PodDisruptionBudget) - for _, uid := range cached { - pdb := &policy.PodDisruptionBudget{} - pdb.UID = types.UID(uid) - cache[uid] = pdb - } - - m, r := compare.ComparePdbs(pdbs, cache) - - if !reflect.DeepEqual(m, missing) { - t.Errorf("missing expected to be %s; got %s", missing, m) - } - - if !reflect.DeepEqual(r, redundant) { - t.Errorf("redundant expected to be %s; got %s", redundant, r) - } -} diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 1995f84df12..28ea19357d4 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -28,7 +28,6 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" - "k8s.io/api/policy/v1beta1" storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -258,15 +257,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator { ) c.nodeLister = args.NodeInformer.Lister() - args.PdbInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: c.addPDBToCache, - UpdateFunc: c.updatePDBInCache, - DeleteFunc: c.deletePDBFromCache, - }, - ) - c.pdbLister = args.PdbInformer.Lister() - // On add and delete of PVs, it will affect equivalence cache items // related to persistent volume args.PvInformer.Informer().AddEventHandler( @@ -320,7 +310,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator { comparer := &cacheComparer{ podLister: args.PodInformer.Lister(), nodeLister: args.NodeInformer.Lister(), - pdbLister: args.PdbInformer.Lister(), cache: c.schedulerCache, podQueue: c.podQueue, } @@ -1003,56 +992,6 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) { } } -func (c *configFactory) addPDBToCache(obj interface{}) { - pdb, ok := obj.(*v1beta1.PodDisruptionBudget) - if !ok { - glog.Errorf("cannot convert to *v1beta1.PodDisruptionBudget: %v", obj) - return - } - - if err := c.schedulerCache.AddPDB(pdb); err != nil { - glog.Errorf("scheduler cache AddPDB failed: %v", err) - } -} - -func (c *configFactory) updatePDBInCache(oldObj, newObj interface{}) { - oldPDB, ok := oldObj.(*v1beta1.PodDisruptionBudget) - if !ok { - glog.Errorf("cannot convert oldObj to *v1beta1.PodDisruptionBudget: %v", oldObj) - return - } - newPDB, ok := newObj.(*v1beta1.PodDisruptionBudget) - if !ok { - glog.Errorf("cannot convert newObj to *v1beta1.PodDisruptionBudget: %v", newObj) - return - } - - if err := c.schedulerCache.UpdatePDB(oldPDB, newPDB); err != nil { - glog.Errorf("scheduler cache UpdatePDB failed: %v", err) - } -} - -func (c *configFactory) deletePDBFromCache(obj interface{}) { - var pdb *v1beta1.PodDisruptionBudget - switch t := obj.(type) { - case *v1beta1.PodDisruptionBudget: - pdb = t - case cache.DeletedFinalStateUnknown: - var ok bool - pdb, ok = t.Obj.(*v1beta1.PodDisruptionBudget) - if !ok { - glog.Errorf("cannot convert to *v1beta1.PodDisruptionBudget: %v", t.Obj) - return - } - default: - glog.Errorf("cannot convert to *v1beta1.PodDisruptionBudget: %v", t) - return - } - if err := c.schedulerCache.RemovePDB(pdb); err != nil { - glog.Errorf("scheduler cache RemovePDB failed: %v", err) - } -} - // Create creates a scheduler with the default algorithm provider. func (c *configFactory) Create() (*scheduler.Config, error) { return c.CreateFromProvider(DefaultProvider) @@ -1203,6 +1142,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders, c.volumeBinder, c.pVCLister, + c.pdbLister, c.alwaysCheckAllPredicates, c.disablePreemption, c.percentageOfNodesToScore, @@ -1281,6 +1221,7 @@ func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) { ReplicaSetLister: c.replicaSetLister, StatefulSetLister: c.statefulSetLister, NodeLister: &nodeLister{c.nodeLister}, + PDBLister: c.pdbLister, NodeInfo: &predicates.CachedNodeInfo{NodeLister: c.nodeLister}, PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister}, PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: c.pVCLister}, diff --git a/pkg/scheduler/factory/plugins.go b/pkg/scheduler/factory/plugins.go index f9a668b4a9b..8c586bb978c 100644 --- a/pkg/scheduler/factory/plugins.go +++ b/pkg/scheduler/factory/plugins.go @@ -41,6 +41,7 @@ type PluginFactoryArgs struct { ReplicaSetLister algorithm.ReplicaSetLister StatefulSetLister algorithm.StatefulSetLister NodeLister algorithm.NodeLister + PDBLister algorithm.PDBLister NodeInfo predicates.NodeInfo PVInfo predicates.PersistentVolumeInfo PVCInfo predicates.PersistentVolumeClaimInfo diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index e950141ae43..a5158bb371b 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -561,6 +561,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache. []algorithm.SchedulerExtender{}, nil, schedulertesting.FakePersistentVolumeClaimLister{}, + schedulertesting.FakePDBLister{}, false, false, api.DefaultPercentageOfNodesToScore) @@ -611,6 +612,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc []algorithm.SchedulerExtender{}, nil, schedulertesting.FakePersistentVolumeClaimLister{}, + schedulertesting.FakePDBLister{}, false, false, api.DefaultPercentageOfNodesToScore) diff --git a/pkg/scheduler/testing/fake_cache.go b/pkg/scheduler/testing/fake_cache.go index b9f2bdf61a5..c7a4d1bfde6 100644 --- a/pkg/scheduler/testing/fake_cache.go +++ b/pkg/scheduler/testing/fake_cache.go @@ -18,7 +18,6 @@ package testing import ( "k8s.io/api/core/v1" - policy "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/labels" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" ) @@ -79,20 +78,6 @@ func (f *FakeCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.N return nil } -// AddPDB is a fake method for testing. -func (f *FakeCache) AddPDB(pdb *policy.PodDisruptionBudget) error { return nil } - -// UpdatePDB is a fake method for testing. -func (f *FakeCache) UpdatePDB(oldPDB, newPDB *policy.PodDisruptionBudget) error { return nil } - -// RemovePDB is a fake method for testing. -func (f *FakeCache) RemovePDB(pdb *policy.PodDisruptionBudget) error { return nil } - -// ListPDBs is a fake method for testing. -func (f *FakeCache) ListPDBs(selector labels.Selector) ([]*policy.PodDisruptionBudget, error) { - return nil, nil -} - // List is a fake method for testing. func (f *FakeCache) List(s labels.Selector) ([]*v1.Pod, error) { return nil, nil } diff --git a/pkg/scheduler/testing/fake_lister.go b/pkg/scheduler/testing/fake_lister.go index 5b67a0cade8..8468aa25b64 100644 --- a/pkg/scheduler/testing/fake_lister.go +++ b/pkg/scheduler/testing/fake_lister.go @@ -21,6 +21,7 @@ import ( apps "k8s.io/api/apps/v1" "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" corelisters "k8s.io/client-go/listers/core/v1" @@ -214,3 +215,11 @@ func (f *fakePersistentVolumeClaimNamespaceLister) Get(name string) (*v1.Persist func (f fakePersistentVolumeClaimNamespaceLister) List(selector labels.Selector) (ret []*v1.PersistentVolumeClaim, err error) { return nil, fmt.Errorf("not implemented") } + +// FakePDBLister implements PDBLister on a slice of PodDisruptionBudgets for test purposes. +type FakePDBLister []*policy.PodDisruptionBudget + +// List returns a list of PodDisruptionBudgets. +func (f FakePDBLister) List(labels.Selector) ([]*policy.PodDisruptionBudget, error) { + return f, nil +} diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index 2c85fd75137..27dd50cdf42 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -842,8 +842,8 @@ func TestPDBInPreemption(t *testing.T) { t.Fatalf("Failed to create PDB: %v", err) } } - // Wait for PDBs to show up in the scheduler's cache and become stable. - if err := waitCachedPDBsStable(context, test.pdbs, test.pdbPodNum); err != nil { + // Wait for PDBs to become stable. + if err := waitForPDBsStable(context, test.pdbs, test.pdbPodNum); err != nil { t.Fatalf("Not all pdbs are stable in the cache: %v", err) } diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 608f679b9fb..3151768454a 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -20,19 +20,13 @@ package scheduler import ( "fmt" - "reflect" "testing" "time" "k8s.io/api/core/v1" - policy "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/diff" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" @@ -671,95 +665,6 @@ func TestAllocatable(t *testing.T) { } } -// TestPDBCache verifies that scheduler cache works as expected when handling -// PodDisruptionBudget. -func TestPDBCache(t *testing.T) { - context := initTest(t, "pdbcache") - defer cleanupTest(t, context) - - intstrMin := intstr.FromInt(4) - pdb := &policy.PodDisruptionBudget{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: context.ns.Name, - Name: "test-pdb", - UID: types.UID("test-pdb-uid"), - Labels: map[string]string{"tkey1": "tval1", "tkey2": "tval2"}, - }, - Spec: policy.PodDisruptionBudgetSpec{ - MinAvailable: &intstrMin, - Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"tkey": "tvalue"}}, - }, - } - - createdPDB, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Create(pdb) - if err != nil { - t.Errorf("Failed to create PDB: %v", err) - } - // Wait for PDB to show up in the scheduler's cache. - if err = wait.Poll(time.Second, 15*time.Second, func() (bool, error) { - cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything()) - if err != nil { - t.Errorf("Error while polling for PDB: %v", err) - return false, err - } - return len(cachedPDBs) > 0, err - }); err != nil { - t.Fatalf("No PDB was added to the cache: %v", err) - } - // Read PDB from the cache and compare it. - cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything()) - if len(cachedPDBs) != 1 { - t.Fatalf("Expected to have 1 pdb in cache, but found %d.", len(cachedPDBs)) - } - if !reflect.DeepEqual(createdPDB, cachedPDBs[0]) { - t.Errorf("Got different PDB than expected.\nDifference detected on:\n%s", diff.ObjectReflectDiff(createdPDB, cachedPDBs[0])) - } - - // Update PDB and change its labels. - pdbCopy := *cachedPDBs[0] - pdbCopy.Labels = map[string]string{} - updatedPDB, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Update(&pdbCopy) - if err != nil { - t.Errorf("Failed to update PDB: %v", err) - } - // Wait for PDB to be updated in the scheduler's cache. - if err = wait.Poll(time.Second, 15*time.Second, func() (bool, error) { - cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything()) - if err != nil { - t.Errorf("Error while polling for PDB: %v", err) - return false, err - } - return len(cachedPDBs[0].Labels) == 0, err - }); err != nil { - t.Fatalf("No PDB was updated in the cache: %v", err) - } - // Read PDB from the cache and compare it. - cachedPDBs, err = context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything()) - if len(cachedPDBs) != 1 { - t.Errorf("Expected to have 1 pdb in cache, but found %d.", len(cachedPDBs)) - } - if !reflect.DeepEqual(updatedPDB, cachedPDBs[0]) { - t.Errorf("Got different PDB than expected.\nDifference detected on:\n%s", diff.ObjectReflectDiff(updatedPDB, cachedPDBs[0])) - } - - // Delete PDB. - err = context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Delete(pdb.Name, &metav1.DeleteOptions{}) - if err != nil { - t.Errorf("Failed to delete PDB: %v", err) - } - // Wait for PDB to be deleted from the scheduler's cache. - if err = wait.Poll(time.Second, 15*time.Second, func() (bool, error) { - cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything()) - if err != nil { - t.Errorf("Error while polling for PDB: %v", err) - return false, err - } - return len(cachedPDBs) == 0, err - }); err != nil { - t.Errorf("No PDB was deleted from the cache: %v", err) - } -} - // TestSchedulerInformers tests that scheduler receives informer events and updates its cache when // pods are scheduled by other schedulers. func TestSchedulerInformers(t *testing.T) { diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index b17ca9fcf96..2ed1d5c45c1 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -631,20 +631,20 @@ func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error { return waitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second) } -// waitCachedPDBsStable waits for PDBs in scheduler cache to have "CurrentHealthy" status equal to +// waitForPDBsStable waits for PDBs to have "CurrentHealthy" status equal to // the expected values. -func waitCachedPDBsStable(context *TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error { +func waitForPDBsStable(context *TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error { return wait.Poll(time.Second, 60*time.Second, func() (bool, error) { - cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything()) + pdbList, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).List(metav1.ListOptions{}) if err != nil { return false, err } - if len(cachedPDBs) != len(pdbs) { + if len(pdbList.Items) != len(pdbs) { return false, nil } for i, pdb := range pdbs { found := false - for _, cpdb := range cachedPDBs { + for _, cpdb := range pdbList.Items { if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace { found = true if cpdb.Status.CurrentHealthy != pdbPodNum[i] {