mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #53914 from bsalamat/pdb
Automatic merge from submit-queue (batch tested with PRs 53903, 53914, 54374). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Add PodDisruptionBudget to scheduler cache. **What this PR does / why we need it**: This is the first step to add support for PodDisruptionBudget during preemption. This PR adds PDB to scheduler cache. **Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes # **Special notes for your reviewer**: None **Release note**: ```release-note Add PodDisruptionBudget to scheduler cache. ``` ref/ #53913
This commit is contained in:
commit
6a444673b9
@ -39,6 +39,7 @@ go_library(
|
||||
"//vendor/k8s.io/client-go/informers/apps/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers/extensions/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers/policy/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
appsinformers "k8s.io/client-go/informers/apps/v1beta1"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1"
|
||||
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
|
||||
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -86,6 +87,7 @@ func CreateScheduler(
|
||||
replicaSetInformer extensionsinformers.ReplicaSetInformer,
|
||||
statefulSetInformer appsinformers.StatefulSetInformer,
|
||||
serviceInformer coreinformers.ServiceInformer,
|
||||
pdbInformer policyinformers.PodDisruptionBudgetInformer,
|
||||
recorder record.EventRecorder,
|
||||
) (*scheduler.Scheduler, error) {
|
||||
configurator := factory.NewConfigFactory(
|
||||
@ -99,6 +101,7 @@ func CreateScheduler(
|
||||
replicaSetInformer,
|
||||
statefulSetInformer,
|
||||
serviceInformer,
|
||||
pdbInformer,
|
||||
s.HardPodAffinitySymmetricWeight,
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
||||
)
|
||||
|
@ -95,6 +95,7 @@ func Run(s *options.SchedulerServer) error {
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
recorder,
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -362,7 +362,8 @@ func ClusterRoles() []rbac.ClusterRole {
|
||||
rbac.NewRule(Read...).Groups(legacyGroup).Resources("services", "replicationcontrollers").RuleOrDie(),
|
||||
rbac.NewRule(Read...).Groups(appsGroup, extensionsGroup).Resources("replicasets").RuleOrDie(),
|
||||
rbac.NewRule(Read...).Groups(appsGroup).Resources("statefulsets").RuleOrDie(),
|
||||
// things that pods use
|
||||
// things that pods use or applies to them
|
||||
rbac.NewRule(Read...).Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(),
|
||||
rbac.NewRule(Read...).Groups(legacyGroup).Resources("persistentvolumeclaims", "persistentvolumes").RuleOrDie(),
|
||||
},
|
||||
},
|
||||
|
@ -667,6 +667,14 @@ items:
|
||||
- get
|
||||
- list
|
||||
- watch
|
||||
- apiGroups:
|
||||
- policy
|
||||
resources:
|
||||
- poddisruptionbudgets
|
||||
verbs:
|
||||
- get
|
||||
- list
|
||||
- watch
|
||||
- apiGroups:
|
||||
- ""
|
||||
resources:
|
||||
|
@ -505,6 +505,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
).CreateFromConfig(policy); err != nil {
|
||||
|
@ -28,6 +28,7 @@ go_library(
|
||||
"//plugin/pkg/scheduler/util:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
|
||||
@ -40,10 +41,12 @@ go_library(
|
||||
"//vendor/k8s.io/client-go/informers/apps/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers/extensions/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers/policy/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/k8s.io/client-go/listers/apps/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/listers/extensions/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/api/policy/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
@ -39,10 +40,12 @@ import (
|
||||
appsinformers "k8s.io/client-go/informers/apps/v1beta1"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1"
|
||||
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
appslisters "k8s.io/client-go/listers/apps/v1beta1"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
extensionslisters "k8s.io/client-go/listers/extensions/v1beta1"
|
||||
policylisters "k8s.io/client-go/listers/policy/v1beta1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/kubernetes/pkg/api/helper"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
@ -93,6 +96,8 @@ type configFactory struct {
|
||||
replicaSetLister extensionslisters.ReplicaSetLister
|
||||
// a means to list all statefulsets
|
||||
statefulSetLister appslisters.StatefulSetLister
|
||||
// a means to list all PodDisruptionBudgets
|
||||
pdbLister policylisters.PodDisruptionBudgetLister
|
||||
|
||||
// Close this to stop all reflectors
|
||||
StopEverything chan struct{}
|
||||
@ -130,6 +135,7 @@ func NewConfigFactory(
|
||||
replicaSetInformer extensionsinformers.ReplicaSetInformer,
|
||||
statefulSetInformer appsinformers.StatefulSetInformer,
|
||||
serviceInformer coreinformers.ServiceInformer,
|
||||
pdbInformer policyinformers.PodDisruptionBudgetInformer,
|
||||
hardPodAffinitySymmetricWeight int,
|
||||
enableEquivalenceClassCache bool,
|
||||
) scheduler.Configurator {
|
||||
@ -146,6 +152,7 @@ func NewConfigFactory(
|
||||
controllerLister: replicationControllerInformer.Lister(),
|
||||
replicaSetLister: replicaSetInformer.Lister(),
|
||||
statefulSetLister: statefulSetInformer.Lister(),
|
||||
pdbLister: pdbInformer.Lister(),
|
||||
schedulerCache: schedulerCache,
|
||||
StopEverything: stopEverything,
|
||||
schedulerName: schedulerName,
|
||||
@ -220,6 +227,15 @@ func NewConfigFactory(
|
||||
)
|
||||
c.nodeLister = nodeInformer.Lister()
|
||||
|
||||
pdbInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addPDBToCache,
|
||||
UpdateFunc: c.updatePDBInCache,
|
||||
DeleteFunc: c.deletePDBFromCache,
|
||||
},
|
||||
)
|
||||
c.pdbLister = pdbInformer.Lister()
|
||||
|
||||
// On add and delete of PVs, it will affect equivalence cache items
|
||||
// related to persistent volume
|
||||
pvInformer.Informer().AddEventHandler(
|
||||
@ -654,6 +670,56 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *configFactory) addPDBToCache(obj interface{}) {
|
||||
pdb, ok := obj.(*v1beta1.PodDisruptionBudget)
|
||||
if !ok {
|
||||
glog.Errorf("cannot convert to *v1beta1.PodDisruptionBudget: %v", obj)
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.schedulerCache.AddPDB(pdb); err != nil {
|
||||
glog.Errorf("scheduler cache AddPDB failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *configFactory) updatePDBInCache(oldObj, newObj interface{}) {
|
||||
oldPDB, ok := oldObj.(*v1beta1.PodDisruptionBudget)
|
||||
if !ok {
|
||||
glog.Errorf("cannot convert oldObj to *v1beta1.PodDisruptionBudget: %v", oldObj)
|
||||
return
|
||||
}
|
||||
newPDB, ok := newObj.(*v1beta1.PodDisruptionBudget)
|
||||
if !ok {
|
||||
glog.Errorf("cannot convert newObj to *v1beta1.PodDisruptionBudget: %v", newObj)
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.schedulerCache.UpdatePDB(oldPDB, newPDB); err != nil {
|
||||
glog.Errorf("scheduler cache UpdatePDB failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *configFactory) deletePDBFromCache(obj interface{}) {
|
||||
var pdb *v1beta1.PodDisruptionBudget
|
||||
switch t := obj.(type) {
|
||||
case *v1beta1.PodDisruptionBudget:
|
||||
pdb = t
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
var ok bool
|
||||
pdb, ok = t.Obj.(*v1beta1.PodDisruptionBudget)
|
||||
if !ok {
|
||||
glog.Errorf("cannot convert to *v1beta1.PodDisruptionBudget: %v", t.Obj)
|
||||
return
|
||||
}
|
||||
default:
|
||||
glog.Errorf("cannot convert to *v1beta1.PodDisruptionBudget: %v", t)
|
||||
return
|
||||
}
|
||||
if err := c.schedulerCache.RemovePDB(pdb); err != nil {
|
||||
glog.Errorf("scheduler cache RemovePDB failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create creates a scheduler with the default algorithm provider.
|
||||
func (f *configFactory) Create() (*scheduler.Config, error) {
|
||||
return f.CreateFromProvider(DefaultProvider)
|
||||
|
@ -64,6 +64,7 @@ func TestCreate(t *testing.T) {
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
@ -96,6 +97,7 @@ func TestCreateFromConfig(t *testing.T) {
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
@ -155,6 +157,7 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) {
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
@ -215,6 +218,7 @@ func TestCreateFromEmptyConfig(t *testing.T) {
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
@ -272,6 +276,7 @@ func TestDefaultErrorFunc(t *testing.T) {
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
@ -385,6 +390,7 @@ func TestResponsibleForPod(t *testing.T) {
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
@ -400,6 +406,7 @@ func TestResponsibleForPod(t *testing.T) {
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
@ -470,6 +477,7 @@ func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) {
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
-1,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
@ -516,6 +524,7 @@ func TestInvalidFactoryArgs(t *testing.T) {
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
test.hardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
|
@ -21,6 +21,7 @@ go_library(
|
||||
"//plugin/pkg/scheduler/util:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
@ -38,9 +39,11 @@ go_test(
|
||||
"//plugin/pkg/scheduler/algorithm/priorities/util:go_default_library",
|
||||
"//plugin/pkg/scheduler/util:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
||||
"github.com/golang/glog"
|
||||
policy "k8s.io/api/policy/v1beta1"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -55,6 +56,7 @@ type schedulerCache struct {
|
||||
// a map from pod key to podState.
|
||||
podStates map[string]*podState
|
||||
nodes map[string]*NodeInfo
|
||||
pdbs map[string]*policy.PodDisruptionBudget
|
||||
}
|
||||
|
||||
type podState struct {
|
||||
@ -74,6 +76,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
|
||||
nodes: make(map[string]*NodeInfo),
|
||||
assumedPods: make(map[string]bool),
|
||||
podStates: make(map[string]*podState),
|
||||
pdbs: make(map[string]*policy.PodDisruptionBudget),
|
||||
}
|
||||
}
|
||||
|
||||
@ -382,6 +385,39 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) AddPDB(pdb *policy.PodDisruptionBudget) error {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
|
||||
// Unconditionally update cache.
|
||||
cache.pdbs[pdb.Name] = pdb
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) UpdatePDB(oldPDB, newPDB *policy.PodDisruptionBudget) error {
|
||||
return cache.AddPDB(newPDB)
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) RemovePDB(pdb *policy.PodDisruptionBudget) error {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
|
||||
delete(cache.pdbs, pdb.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) ListPDBs(selector labels.Selector) ([]*policy.PodDisruptionBudget, error) {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
var pdbs []*policy.PodDisruptionBudget
|
||||
for _, pdb := range cache.pdbs {
|
||||
if selector.Matches(labels.Set(pdb.Labels)) {
|
||||
pdbs = append(pdbs, pdb)
|
||||
}
|
||||
}
|
||||
return pdbs, nil
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) run() {
|
||||
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
|
||||
}
|
||||
|
@ -24,9 +24,11 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/api/policy/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
|
||||
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
|
||||
schedutil "k8s.io/kubernetes/plugin/pkg/scheduler/util"
|
||||
@ -900,3 +902,107 @@ func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time)
|
||||
}
|
||||
return cache
|
||||
}
|
||||
|
||||
func makePDB(name, namespace string, labels map[string]string, minAvailable int) *v1beta1.PodDisruptionBudget {
|
||||
intstrMin := intstr.FromInt(minAvailable)
|
||||
pdb := &v1beta1.PodDisruptionBudget{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: namespace,
|
||||
Name: name,
|
||||
Labels: labels,
|
||||
},
|
||||
Spec: v1beta1.PodDisruptionBudgetSpec{
|
||||
MinAvailable: &intstrMin,
|
||||
Selector: &metav1.LabelSelector{MatchLabels: labels},
|
||||
},
|
||||
}
|
||||
|
||||
return pdb
|
||||
}
|
||||
|
||||
// TestPDBOperations tests that a PDB will be add/updated/deleted correctly.
|
||||
func TestPDBOperations(t *testing.T) {
|
||||
ttl := 10 * time.Second
|
||||
testPDBs := []*v1beta1.PodDisruptionBudget{
|
||||
makePDB("pdb0", "ns1", map[string]string{"tkey1": "tval1"}, 3),
|
||||
makePDB("pdb1", "ns1", map[string]string{"tkey1": "tval1", "tkey2": "tval2"}, 1),
|
||||
makePDB("pdb2", "ns3", map[string]string{"tkey3": "tval3", "tkey2": "tval2"}, 10),
|
||||
}
|
||||
updatedPDBs := []*v1beta1.PodDisruptionBudget{
|
||||
makePDB("pdb0", "ns1", map[string]string{"tkey4": "tval4"}, 8),
|
||||
makePDB("pdb1", "ns1", map[string]string{"tkey1": "tval1"}, 1),
|
||||
makePDB("pdb2", "ns3", map[string]string{"tkey3": "tval3", "tkey1": "tval1", "tkey2": "tval2"}, 10),
|
||||
}
|
||||
tests := []struct {
|
||||
pdbsToAdd []*v1beta1.PodDisruptionBudget
|
||||
pdbsToUpdate []*v1beta1.PodDisruptionBudget
|
||||
pdbsToDelete []*v1beta1.PodDisruptionBudget
|
||||
expectedPDBs []*v1beta1.PodDisruptionBudget // Expected PDBs after all operations
|
||||
}{
|
||||
{
|
||||
pdbsToAdd: []*v1beta1.PodDisruptionBudget{testPDBs[0]},
|
||||
pdbsToUpdate: []*v1beta1.PodDisruptionBudget{testPDBs[0], testPDBs[1], testPDBs[0]},
|
||||
expectedPDBs: []*v1beta1.PodDisruptionBudget{testPDBs[0], testPDBs[1]}, // both will be in the cache as they have different names
|
||||
},
|
||||
{
|
||||
pdbsToAdd: []*v1beta1.PodDisruptionBudget{testPDBs[0]},
|
||||
pdbsToUpdate: []*v1beta1.PodDisruptionBudget{testPDBs[0], updatedPDBs[0]},
|
||||
expectedPDBs: []*v1beta1.PodDisruptionBudget{updatedPDBs[0]},
|
||||
},
|
||||
{
|
||||
pdbsToAdd: []*v1beta1.PodDisruptionBudget{testPDBs[0], testPDBs[2]},
|
||||
pdbsToUpdate: []*v1beta1.PodDisruptionBudget{testPDBs[0], updatedPDBs[0]},
|
||||
pdbsToDelete: []*v1beta1.PodDisruptionBudget{testPDBs[0]},
|
||||
expectedPDBs: []*v1beta1.PodDisruptionBudget{testPDBs[2]},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
cache := newSchedulerCache(ttl, time.Second, nil)
|
||||
for _, pdbToAdd := range test.pdbsToAdd {
|
||||
if err := cache.AddPDB(pdbToAdd); err != nil {
|
||||
t.Fatalf("AddPDB failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
for i := range test.pdbsToUpdate {
|
||||
if i == 0 {
|
||||
continue
|
||||
}
|
||||
if err := cache.UpdatePDB(test.pdbsToUpdate[i-1], test.pdbsToUpdate[i]); err != nil {
|
||||
t.Fatalf("UpdatePDB failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, pdb := range test.pdbsToDelete {
|
||||
if err := cache.RemovePDB(pdb); err != nil {
|
||||
t.Fatalf("RemovePDB failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
cachedPDBs, err := cache.ListPDBs(labels.Everything())
|
||||
if err != nil {
|
||||
t.Fatalf("ListPDBs failed: %v", err)
|
||||
}
|
||||
if len(cachedPDBs) != len(test.expectedPDBs) {
|
||||
t.Errorf("Expected %d PDBs, got %d", len(test.expectedPDBs), len(cachedPDBs))
|
||||
}
|
||||
for _, pdb := range test.expectedPDBs {
|
||||
found := false
|
||||
// find it among the cached ones
|
||||
for _, cpdb := range cachedPDBs {
|
||||
if pdb.Name == cpdb.Name {
|
||||
found = true
|
||||
if !reflect.DeepEqual(pdb, cpdb) {
|
||||
t.Errorf("%v is not equal to %v", pdb, cpdb)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("PDB with name '%v' was not found in the cache.", pdb.Name)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ package schedulercache
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
)
|
||||
|
||||
@ -95,6 +96,18 @@ type Cache interface {
|
||||
// RemoveNode removes overall information about node.
|
||||
RemoveNode(node *v1.Node) error
|
||||
|
||||
// AddPDB adds a PodDisruptionBudget object to the cache.
|
||||
AddPDB(pdb *policy.PodDisruptionBudget) error
|
||||
|
||||
// UpdatePDB updates a PodDisruptionBudget object in the cache.
|
||||
UpdatePDB(oldPDB, newPDB *policy.PodDisruptionBudget) error
|
||||
|
||||
// RemovePDB removes a PodDisruptionBudget object from the cache.
|
||||
RemovePDB(pdb *policy.PodDisruptionBudget) error
|
||||
|
||||
// List lists all cached PDBs matching the selector.
|
||||
ListPDBs(selector labels.Selector) ([]*policy.PodDisruptionBudget, error)
|
||||
|
||||
// UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache.
|
||||
// The node info contains aggregated information of pods scheduled (including assumed to be)
|
||||
// on this node.
|
||||
|
@ -19,6 +19,7 @@ go_library(
|
||||
"//vendor/k8s.io/api/apps/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
],
|
||||
|
@ -18,6 +18,7 @@ package testing
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
)
|
||||
@ -66,6 +67,16 @@ func (f *FakeCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.N
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FakeCache) AddPDB(pdb *policy.PodDisruptionBudget) error { return nil }
|
||||
|
||||
func (f *FakeCache) UpdatePDB(oldPDB, newPDB *policy.PodDisruptionBudget) error { return nil }
|
||||
|
||||
func (f *FakeCache) RemovePDB(pdb *policy.PodDisruptionBudget) error { return nil }
|
||||
|
||||
func (f *FakeCache) ListPDBs(selector labels.Selector) ([]*policy.PodDisruptionBudget, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (f *FakeCache) List(s labels.Selector) ([]*v1.Pod, error) { return nil, nil }
|
||||
|
||||
func (f *FakeCache) FilteredList(filter schedulercache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
|
||||
|
@ -44,9 +44,13 @@ go_test(
|
||||
"//test/integration/framework:go_default_library",
|
||||
"//test/utils:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||
|
@ -368,6 +368,7 @@ func TestSchedulerExtender(t *testing.T) {
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
|
@ -20,13 +20,18 @@ package scheduler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
@ -134,6 +139,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}),
|
||||
)
|
||||
if err != nil {
|
||||
@ -186,6 +192,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) {
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}),
|
||||
)
|
||||
|
||||
@ -224,6 +231,7 @@ func TestSchedulerCreationInLegacyMode(t *testing.T) {
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}),
|
||||
)
|
||||
if err != nil {
|
||||
@ -509,6 +517,7 @@ func TestMultiScheduler(t *testing.T) {
|
||||
informerFactory2.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory2.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory2.Core().V1().Services(),
|
||||
informerFactory2.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
@ -900,3 +909,91 @@ func TestPreemption(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestPDBCache verifies that scheduler cache works as expected when handling
|
||||
// PodDisruptionBudget.
|
||||
func TestPDBCache(t *testing.T) {
|
||||
context := initTest(t, "pdbcache")
|
||||
defer cleanupTest(t, context)
|
||||
|
||||
intstrMin := intstr.FromInt(4)
|
||||
pdb := &policy.PodDisruptionBudget{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: context.ns.Name,
|
||||
Name: "test-pdb",
|
||||
Labels: map[string]string{"tkey1": "tval1", "tkey2": "tval2"},
|
||||
},
|
||||
Spec: policy.PodDisruptionBudgetSpec{
|
||||
MinAvailable: &intstrMin,
|
||||
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"tkey": "tvalue"}},
|
||||
},
|
||||
}
|
||||
|
||||
createdPDB, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Create(pdb)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to create PDB: %v", err)
|
||||
}
|
||||
// Wait for PDB to show up in the scheduler's cache.
|
||||
if err = wait.Poll(time.Second, 15*time.Second, func() (bool, error) {
|
||||
cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything())
|
||||
if err != nil {
|
||||
t.Errorf("Error while polling for PDB: %v", err)
|
||||
return false, err
|
||||
}
|
||||
return len(cachedPDBs) > 0, err
|
||||
}); err != nil {
|
||||
t.Fatalf("No PDB was added to the cache: %v", err)
|
||||
}
|
||||
// Read PDB from the cache and compare it.
|
||||
cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything())
|
||||
if len(cachedPDBs) != 1 {
|
||||
t.Fatalf("Expected to have 1 pdb in cache, but found %d.", len(cachedPDBs))
|
||||
}
|
||||
if !reflect.DeepEqual(createdPDB, cachedPDBs[0]) {
|
||||
t.Errorf("Got different PDB than expected.\nDifference detected on:\n%s", diff.ObjectReflectDiff(createdPDB, cachedPDBs[0]))
|
||||
}
|
||||
|
||||
// Update PDB and change its labels.
|
||||
pdbCopy := *cachedPDBs[0]
|
||||
pdbCopy.Labels = map[string]string{}
|
||||
updatedPDB, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Update(&pdbCopy)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to update PDB: %v", err)
|
||||
}
|
||||
// Wait for PDB to be updated in the scheduler's cache.
|
||||
if err = wait.Poll(time.Second, 15*time.Second, func() (bool, error) {
|
||||
cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything())
|
||||
if err != nil {
|
||||
t.Errorf("Error while polling for PDB: %v", err)
|
||||
return false, err
|
||||
}
|
||||
return len(cachedPDBs[0].Labels) == 0, err
|
||||
}); err != nil {
|
||||
t.Fatalf("No PDB was updated in the cache: %v", err)
|
||||
}
|
||||
// Read PDB from the cache and compare it.
|
||||
cachedPDBs, err = context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything())
|
||||
if len(cachedPDBs) != 1 {
|
||||
t.Errorf("Expected to have 1 pdb in cache, but found %d.", len(cachedPDBs))
|
||||
}
|
||||
if !reflect.DeepEqual(updatedPDB, cachedPDBs[0]) {
|
||||
t.Errorf("Got different PDB than expected.\nDifference detected on:\n%s", diff.ObjectReflectDiff(updatedPDB, cachedPDBs[0]))
|
||||
}
|
||||
|
||||
// Delete PDB.
|
||||
err = context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Delete(pdb.Name, &metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
t.Errorf("Failed to delete PDB: %v", err)
|
||||
}
|
||||
// Wait for PDB to be deleted from the scheduler's cache.
|
||||
if err = wait.Poll(time.Second, 15*time.Second, func() (bool, error) {
|
||||
cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything())
|
||||
if err != nil {
|
||||
t.Errorf("Error while polling for PDB: %v", err)
|
||||
return false, err
|
||||
}
|
||||
return len(cachedPDBs) == 0, err
|
||||
}); err != nil {
|
||||
t.Errorf("No PDB was deleted from the cache: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -130,6 +130,7 @@ func TestTaintNodeByCondition(t *testing.T) {
|
||||
informers.Extensions().V1beta1().ReplicaSets(),
|
||||
informers.Apps().V1beta1().StatefulSets(),
|
||||
informers.Core().V1().Services(),
|
||||
informers.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
true, // Enable EqualCache by default.
|
||||
)
|
||||
|
@ -76,6 +76,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext {
|
||||
context.informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
context.informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
context.informerFactory.Core().V1().Services(),
|
||||
context.informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
true,
|
||||
)
|
||||
|
@ -74,6 +74,7 @@ func mustSetupScheduler() (schedulerConfigurator scheduler.Configurator, destroy
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user