Switch disruption controller to shared informers

This commit is contained in:
Andy Goldstein 2017-02-08 13:20:31 -05:00
parent e9de1b0221
commit 74186d3e06
7 changed files with 185 additions and 181 deletions

View File

@ -30,7 +30,12 @@ func startDisruptionController(ctx ControllerContext) (bool, error) {
return false, nil
}
go disruption.NewDisruptionController(
ctx.InformerFactory.Pods().Informer(),
ctx.NewInformerFactory.Core().V1().Pods(),
ctx.NewInformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
ctx.NewInformerFactory.Core().V1().ReplicationControllers(),
ctx.NewInformerFactory.Extensions().V1beta1().ReplicaSets(),
ctx.NewInformerFactory.Extensions().V1beta1().Deployments(),
ctx.NewInformerFactory.Apps().V1beta1().StatefulSets(),
ctx.ClientBuilder.ClientOrDie("disruption-controller"),
).Run(ctx.Stop)
return true, nil

View File

@ -15,21 +15,25 @@ go_library(
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/policy/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/typed/policy/v1beta1:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/informers/informers_generated/apps/v1beta1:go_default_library",
"//pkg/client/informers/informers_generated/core/v1:go_default_library",
"//pkg/client/informers/informers_generated/extensions/v1beta1:go_default_library",
"//pkg/client/informers/informers_generated/policy/v1beta1:go_default_library",
"//pkg/client/listers/apps/v1beta1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/client/listers/extensions/v1beta1:go_default_library",
"//pkg/client/listers/policy/v1beta1:go_default_library",
"//pkg/controller:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/cache",
@ -49,14 +53,12 @@ go_test(
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/policy/v1beta1:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/informers/informers_generated:go_default_library",
"//pkg/controller:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/uuid",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/tools/record",
"//vendor:k8s.io/client-go/util/workqueue",
],
)

View File

@ -21,13 +21,12 @@ import (
"reflect"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
@ -35,12 +34,17 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
policyclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/policy/v1beta1"
"k8s.io/kubernetes/pkg/client/legacylisters"
appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/apps/v1beta1"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/core/v1"
extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/extensions/v1beta1"
policyinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/policy/v1beta1"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
policylisters "k8s.io/kubernetes/pkg/client/listers/policy/v1beta1"
"k8s.io/kubernetes/pkg/controller"
"github.com/golang/glog"
@ -64,28 +68,23 @@ type updater func(*policy.PodDisruptionBudget) error
type DisruptionController struct {
kubeClient clientset.Interface
pdbStore cache.Store
pdbController cache.Controller
pdbLister listers.StoreToPodDisruptionBudgetLister
pdbLister policylisters.PodDisruptionBudgetLister
pdbListerSynced cache.InformerSynced
podController cache.Controller
podLister listers.StoreToPodLister
podLister corelisters.PodLister
podListerSynced cache.InformerSynced
rcIndexer cache.Indexer
rcController cache.Controller
rcLister listers.StoreToReplicationControllerLister
rcLister corelisters.ReplicationControllerLister
rcListerSynced cache.InformerSynced
rsStore cache.Store
rsController cache.Controller
rsLister listers.StoreToReplicaSetLister
rsLister extensionslisters.ReplicaSetLister
rsListerSynced cache.InformerSynced
dIndexer cache.Indexer
dController cache.Controller
dLister listers.StoreToDeploymentLister
dLister extensionslisters.DeploymentLister
dListerSynced cache.InformerSynced
ssStore cache.Store
ssController cache.Controller
ssLister listers.StoreToStatefulSetLister
ssLister appslisters.StatefulSetLister
ssListerSynced cache.InformerSynced
// PodDisruptionBudget keys that need to be synced.
queue workqueue.RateLimitingInterface
@ -108,108 +107,55 @@ type controllerAndScale struct {
// controllers and their scale.
type podControllerFinder func(*v1.Pod) ([]controllerAndScale, error)
func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface) *DisruptionController {
func NewDisruptionController(
podInformer coreinformers.PodInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
rcInformer coreinformers.ReplicationControllerInformer,
rsInformer extensionsinformers.ReplicaSetInformer,
dInformer extensionsinformers.DeploymentInformer,
ssInformer appsinformers.StatefulSetInformer,
kubeClient clientset.Interface,
) *DisruptionController {
dc := &DisruptionController{
kubeClient: kubeClient,
podController: podInformer.GetController(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"),
recheckQueue: workqueue.NewNamedDelayingQueue("disruption-recheck"),
broadcaster: record.NewBroadcaster(),
kubeClient: kubeClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"),
recheckQueue: workqueue.NewNamedDelayingQueue("disruption-recheck"),
broadcaster: record.NewBroadcaster(),
}
dc.recorder = dc.broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "controllermanager"})
dc.getUpdater = func() updater { return dc.writePdbStatus }
dc.podLister.Indexer = podInformer.GetIndexer()
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addPod,
UpdateFunc: dc.updatePod,
DeleteFunc: dc.deletePod,
})
dc.podLister = podInformer.Lister()
dc.podListerSynced = podInformer.Informer().HasSynced
dc.pdbStore, dc.pdbController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Policy().PodDisruptionBudgets(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return dc.kubeClient.Policy().PodDisruptionBudgets(metav1.NamespaceAll).Watch(options)
},
},
&policy.PodDisruptionBudget{},
30*time.Second,
pdbInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDb,
UpdateFunc: dc.updateDb,
DeleteFunc: dc.removeDb,
},
)
dc.pdbLister.Store = dc.pdbStore
dc.rcIndexer, dc.rcController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Core().ReplicationControllers(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return dc.kubeClient.Core().ReplicationControllers(metav1.NamespaceAll).Watch(options)
},
},
&v1.ReplicationController{},
30*time.Second,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dc.pdbLister = pdbInformer.Lister()
dc.pdbListerSynced = pdbInformer.Informer().HasSynced
dc.rcLister.Indexer = dc.rcIndexer
dc.rcLister = rcInformer.Lister()
dc.rcListerSynced = rcInformer.Informer().HasSynced
dc.rsLister.Indexer, dc.rsController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Extensions().ReplicaSets(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return dc.kubeClient.Extensions().ReplicaSets(metav1.NamespaceAll).Watch(options)
},
},
&extensions.ReplicaSet{},
30*time.Second,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dc.rsStore = dc.rsLister.Indexer
dc.rsLister = rsInformer.Lister()
dc.rsListerSynced = rsInformer.Informer().HasSynced
dc.dIndexer, dc.dController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Extensions().Deployments(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return dc.kubeClient.Extensions().Deployments(metav1.NamespaceAll).Watch(options)
},
},
&extensions.Deployment{},
30*time.Second,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dc.dLister.Indexer = dc.dIndexer
dc.dLister = dInformer.Lister()
dc.dListerSynced = dInformer.Informer().HasSynced
dc.ssStore, dc.ssController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Apps().StatefulSets(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return dc.kubeClient.Apps().StatefulSets(metav1.NamespaceAll).Watch(options)
},
},
&apps.StatefulSet{},
30*time.Second,
cache.ResourceEventHandlerFuncs{},
)
dc.ssLister.Store = dc.ssStore
dc.ssLister = ssInformer.Lister()
dc.ssListerSynced = ssInformer.Informer().HasSynced
return dc
}
@ -317,19 +263,22 @@ func (dc *DisruptionController) getPodReplicationControllers(pod *v1.Pod) ([]con
}
func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()
glog.V(0).Infof("Starting disruption controller")
if !cache.WaitForCacheSync(stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
if dc.kubeClient != nil {
glog.V(0).Infof("Sending events to api server.")
dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(dc.kubeClient.Core().RESTClient()).Events("")})
} else {
glog.V(0).Infof("No api server defined - no events will be sent to API server.")
}
go dc.pdbController.Run(stopCh)
go dc.podController.Run(stopCh)
go dc.rcController.Run(stopCh)
go dc.rsController.Run(stopCh)
go dc.dController.Run(stopCh)
go dc.ssController.Run(stopCh)
go wait.Until(dc.worker, time.Second, stopCh)
go wait.Until(dc.recheckWorker, time.Second, stopCh)
@ -442,7 +391,7 @@ func (dc *DisruptionController) getPdbForPod(pod *v1.Pod) *policy.PodDisruptionB
glog.Warning(msg)
dc.recorder.Event(pod, v1.EventTypeWarning, "MultiplePodDisruptionBudgets", msg)
}
return &pdbs[0]
return pdbs[0]
}
func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ([]*v1.Pod, error) {
@ -510,8 +459,12 @@ func (dc *DisruptionController) sync(key string) error {
glog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := dc.pdbLister.Store.GetByKey(key)
if !exists {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
pdb, err := dc.pdbLister.PodDisruptionBudgets(namespace).Get(name)
if errors.IsNotFound(err) {
glog.V(4).Infof("PodDisruptionBudget %q has been deleted", key)
return nil
}
@ -519,8 +472,6 @@ func (dc *DisruptionController) sync(key string) error {
return err
}
pdb := obj.(*policy.PodDisruptionBudget)
if err := dc.trySync(pdb); err != nil {
glog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err)
return dc.failSafe(pdb)

View File

@ -26,21 +26,21 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
"k8s.io/kubernetes/pkg/client/legacylisters"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated"
"k8s.io/kubernetes/pkg/controller"
)
type pdbStates map[string]policy.PodDisruptionBudget
var alwaysReady = func() bool { return true }
func (ps *pdbStates) Set(pdb *policy.PodDisruptionBudget) error {
key, err := controller.KeyFunc(pdb)
if err != nil {
@ -85,23 +85,48 @@ func (ps *pdbStates) VerifyDisruptionAllowed(t *testing.T, key string, disruptio
}
}
func newFakeDisruptionController() (*DisruptionController, *pdbStates) {
type disruptionController struct {
*DisruptionController
podStore cache.Store
pdbStore cache.Store
rcStore cache.Store
rsStore cache.Store
dStore cache.Store
ssStore cache.Store
}
func newFakeDisruptionController() (*disruptionController, *pdbStates) {
ps := &pdbStates{}
dc := &DisruptionController{
pdbLister: listers.StoreToPodDisruptionBudgetLister{Store: cache.NewStore(controller.KeyFunc)},
podLister: listers.StoreToPodLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})},
rcLister: listers.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
rsLister: listers.StoreToReplicaSetLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
dLister: listers.StoreToDeploymentLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})},
ssLister: listers.StoreToStatefulSetLister{Store: cache.NewStore(controller.KeyFunc)},
getUpdater: func() updater { return ps.Set },
broadcaster: record.NewBroadcaster(),
}
informerFactory := informers.NewSharedInformerFactory(nil, nil, controller.NoResyncPeriodFunc())
dc.recorder = dc.broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "disruption_test"})
dc := NewDisruptionController(
informerFactory.Core().V1().Pods(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Extensions().V1beta1().Deployments(),
informerFactory.Apps().V1beta1().StatefulSets(),
nil,
)
dc.getUpdater = func() updater { return ps.Set }
dc.podListerSynced = alwaysReady
dc.pdbListerSynced = alwaysReady
dc.rcListerSynced = alwaysReady
dc.rsListerSynced = alwaysReady
dc.dListerSynced = alwaysReady
dc.ssListerSynced = alwaysReady
return dc, ps
return &disruptionController{
dc,
informerFactory.Core().V1().Pods().Informer().GetStore(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Informer().GetStore(),
informerFactory.Core().V1().ReplicationControllers().Informer().GetStore(),
informerFactory.Extensions().V1beta1().ReplicaSets().Informer().GetStore(),
informerFactory.Extensions().V1beta1().Deployments().Informer().GetStore(),
informerFactory.Apps().V1beta1().StatefulSets().Informer().GetStore(),
}, ps
}
func fooBar() map[string]string {
@ -283,11 +308,11 @@ func TestNoSelector(t *testing.T) {
pdb.Spec.Selector = &metav1.LabelSelector{}
pod, _ := newPod(t, "yo-yo-yo")
add(t, dc.pdbLister.Store, pdb)
add(t, dc.pdbStore, pdb)
dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]metav1.Time{})
add(t, dc.podLister.Indexer, pod)
add(t, dc.podStore, pod)
dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]metav1.Time{})
}
@ -298,7 +323,7 @@ func TestUnavailable(t *testing.T) {
dc, ps := newFakeDisruptionController()
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromInt(3))
add(t, dc.pdbLister.Store, pdb)
add(t, dc.pdbStore, pdb)
dc.sync(pdbName)
// Add three pods, verifying that the counts go up at each step.
@ -307,14 +332,14 @@ func TestUnavailable(t *testing.T) {
ps.VerifyPdbStatus(t, pdbName, 0, i, 3, i, map[string]metav1.Time{})
pod, _ := newPod(t, fmt.Sprintf("yo-yo-yo %d", i))
pods = append(pods, pod)
add(t, dc.podLister.Indexer, pod)
add(t, dc.podStore, pod)
dc.sync(pdbName)
}
ps.VerifyPdbStatus(t, pdbName, 1, 4, 3, 4, map[string]metav1.Time{})
// Now set one pod as unavailable
pods[0].Status.Conditions = []v1.PodCondition{}
update(t, dc.podLister.Indexer, pods[0])
update(t, dc.podStore, pods[0])
dc.sync(pdbName)
// Verify expected update
@ -327,13 +352,13 @@ func TestNakedPod(t *testing.T) {
dc, ps := newFakeDisruptionController()
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("28%"))
add(t, dc.pdbLister.Store, pdb)
add(t, dc.pdbStore, pdb)
dc.sync(pdbName)
// This verifies that when a PDB has 0 pods, disruptions are not allowed.
ps.VerifyDisruptionAllowed(t, pdbName, 0)
pod, _ := newPod(t, "naked")
add(t, dc.podLister.Indexer, pod)
add(t, dc.podStore, pod)
dc.sync(pdbName)
ps.VerifyDisruptionAllowed(t, pdbName, 0)
@ -344,13 +369,13 @@ func TestReplicaSet(t *testing.T) {
dc, ps := newFakeDisruptionController()
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("20%"))
add(t, dc.pdbLister.Store, pdb)
add(t, dc.pdbStore, pdb)
rs, _ := newReplicaSet(t, 10)
add(t, dc.rsLister.Indexer, rs)
add(t, dc.rsStore, rs)
pod, _ := newPod(t, "pod")
add(t, dc.podLister.Indexer, pod)
add(t, dc.podStore, pod)
dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 1, 2, 10, map[string]metav1.Time{})
}
@ -363,11 +388,11 @@ func TestMultipleControllers(t *testing.T) {
dc, ps := newFakeDisruptionController()
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("1%"))
add(t, dc.pdbLister.Store, pdb)
add(t, dc.pdbStore, pdb)
for i := 0; i < podCount; i++ {
pod, _ := newPod(t, fmt.Sprintf("pod %d", i))
add(t, dc.podLister.Indexer, pod)
add(t, dc.podStore, pod)
}
dc.sync(pdbName)
@ -376,7 +401,7 @@ func TestMultipleControllers(t *testing.T) {
rc, _ := newReplicationController(t, 1)
rc.Name = "rc 1"
add(t, dc.rcLister.Indexer, rc)
add(t, dc.rcStore, rc)
dc.sync(pdbName)
// One RC and 200%>1% healthy => disruption allowed
@ -384,7 +409,7 @@ func TestMultipleControllers(t *testing.T) {
rc, _ = newReplicationController(t, 1)
rc.Name = "rc 2"
add(t, dc.rcLister.Indexer, rc)
add(t, dc.rcStore, rc)
dc.sync(pdbName)
// 100%>1% healthy BUT two RCs => no disruption allowed
@ -405,10 +430,10 @@ func TestReplicationController(t *testing.T) {
// 34% should round up to 2
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("34%"))
add(t, dc.pdbLister.Store, pdb)
add(t, dc.pdbStore, pdb)
rc, _ := newReplicationController(t, 3)
rc.Spec.Selector = labels
add(t, dc.rcLister.Indexer, rc)
add(t, dc.rcStore, rc)
dc.sync(pdbName)
// It starts out at 0 expected because, with no pods, the PDB doesn't know
@ -421,7 +446,7 @@ func TestReplicationController(t *testing.T) {
pod, _ := newPod(t, fmt.Sprintf("foobar %d", i))
pods = append(pods, pod)
pod.Labels = labels
add(t, dc.podLister.Indexer, pod)
add(t, dc.podStore, pod)
dc.sync(pdbName)
if i < 2 {
ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{})
@ -431,7 +456,7 @@ func TestReplicationController(t *testing.T) {
}
rogue, _ := newPod(t, "rogue")
add(t, dc.podLister.Indexer, rogue)
add(t, dc.podStore, rogue)
dc.sync(pdbName)
ps.VerifyDisruptionAllowed(t, pdbName, 0)
}
@ -446,9 +471,9 @@ func TestStatefulSetController(t *testing.T) {
// 34% should round up to 2
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("34%"))
add(t, dc.pdbLister.Store, pdb)
add(t, dc.pdbStore, pdb)
ss, _ := newStatefulSet(t, 3)
add(t, dc.ssLister.Store, ss)
add(t, dc.ssStore, ss)
dc.sync(pdbName)
// It starts out at 0 expected because, with no pods, the PDB doesn't know
@ -461,7 +486,7 @@ func TestStatefulSetController(t *testing.T) {
pod, _ := newPod(t, fmt.Sprintf("foobar %d", i))
pods = append(pods, pod)
pod.Labels = labels
add(t, dc.podLister.Indexer, pod)
add(t, dc.podStore, pod)
dc.sync(pdbName)
if i < 2 {
ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{})
@ -494,10 +519,10 @@ func TestTwoControllers(t *testing.T) {
const minimumTwo int32 = 7 // integer minimum with two controllers
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("28%"))
add(t, dc.pdbLister.Store, pdb)
add(t, dc.pdbStore, pdb)
rc, _ := newReplicationController(t, collectionSize)
rc.Spec.Selector = rcLabels
add(t, dc.rcLister.Indexer, rc)
add(t, dc.rcStore, rc)
dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{})
@ -512,7 +537,7 @@ func TestTwoControllers(t *testing.T) {
if i <= unavailablePods {
pod.Status.Conditions = []v1.PodCondition{}
}
add(t, dc.podLister.Indexer, pod)
add(t, dc.podStore, pod)
dc.sync(pdbName)
if i <= unavailablePods {
ps.VerifyPdbStatus(t, pdbName, 0, 0, minimumOne, collectionSize, map[string]metav1.Time{})
@ -525,14 +550,14 @@ func TestTwoControllers(t *testing.T) {
d, _ := newDeployment(t, collectionSize)
d.Spec.Selector = newSel(dLabels)
add(t, dc.dLister.Indexer, d)
add(t, dc.dStore, d)
dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]metav1.Time{})
rs, _ := newReplicaSet(t, collectionSize)
rs.Spec.Selector = newSel(dLabels)
rs.Labels = dLabels
add(t, dc.rsLister.Indexer, rs)
add(t, dc.rsStore, rs)
dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]metav1.Time{})
@ -545,7 +570,7 @@ func TestTwoControllers(t *testing.T) {
if i <= unavailablePods {
pod.Status.Conditions = []v1.PodCondition{}
}
add(t, dc.podLister.Indexer, pod)
add(t, dc.podStore, pod)
dc.sync(pdbName)
if i <= unavailablePods {
ps.VerifyPdbStatus(t, pdbName, 0, minimumOne+1, minimumTwo, 2*collectionSize, map[string]metav1.Time{})
@ -562,17 +587,17 @@ func TestTwoControllers(t *testing.T) {
// verify that a disruption is permitted again.
ps.VerifyPdbStatus(t, pdbName, 2, 2+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{})
pods[collectionSize-1].Status.Conditions = []v1.PodCondition{}
update(t, dc.podLister.Indexer, pods[collectionSize-1])
update(t, dc.podStore, pods[collectionSize-1])
dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{})
pods[collectionSize-2].Status.Conditions = []v1.PodCondition{}
update(t, dc.podLister.Indexer, pods[collectionSize-2])
update(t, dc.podStore, pods[collectionSize-2])
dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{})
pods[collectionSize-1].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}
update(t, dc.podLister.Indexer, pods[collectionSize-1])
update(t, dc.podStore, pods[collectionSize-1])
dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{})
}
@ -581,7 +606,7 @@ func TestTwoControllers(t *testing.T) {
func TestPDBNotExist(t *testing.T) {
dc, _ := newFakeDisruptionController()
pdb, _ := newPodDisruptionBudget(t, intstr.FromString("67%"))
add(t, dc.pdbLister.Store, pdb)
add(t, dc.pdbStore, pdb)
if err := dc.sync("notExist"); err != nil {
t.Errorf("Unexpected error: %v, expect nil", err)
}
@ -598,16 +623,16 @@ func TestUpdateDisruptedPods(t *testing.T) {
"p3": {Time: currentTime}, // Should remain, pod untouched.
"notthere": {Time: currentTime}, // Should be removed, pod deleted.
}
add(t, dc.pdbLister.Store, pdb)
add(t, dc.pdbStore, pdb)
pod1, _ := newPod(t, "p1")
pod1.DeletionTimestamp = &metav1.Time{Time: time.Now()}
pod2, _ := newPod(t, "p2")
pod3, _ := newPod(t, "p3")
add(t, dc.podLister.Indexer, pod1)
add(t, dc.podLister.Indexer, pod2)
add(t, dc.podLister.Indexer, pod3)
add(t, dc.podStore, pod1)
add(t, dc.podStore, pod2)
add(t, dc.podStore, pod3)
dc.sync(pdbName)

View File

@ -283,6 +283,8 @@ func ClusterRoles() []rbac.ClusterRole {
"persistentvolumes", "pods", "secrets", "services", "serviceaccounts", "replicationcontrollers").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources("daemonsets", "deployments", "replicasets").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(appsGroup).Resources("statefulsets").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(),
},
},
{

View File

@ -475,6 +475,20 @@ items:
verbs:
- list
- watch
- apiGroups:
- apps
resources:
- statefulsets
verbs:
- list
- watch
- apiGroups:
- policy
resources:
- poddisruptionbudgets
verbs:
- list
- watch
- apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:

View File

@ -36,8 +36,8 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/policy/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated"
"k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/test/integration/framework"
)
@ -50,14 +50,14 @@ const (
func TestConcurrentEvictionRequests(t *testing.T) {
podNameFormat := "test-pod-%d"
s, rm, podInformer, clientSet := rmSetup(t)
s, rm, informers, clientSet := rmSetup(t)
defer s.Close()
ns := framework.CreateTestingNamespace("concurrent-eviction-requests", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
informers.Start(stopCh)
go rm.Run(stopCh)
defer close(stopCh)
@ -87,7 +87,7 @@ func TestConcurrentEvictionRequests(t *testing.T) {
}
}
waitToObservePods(t, podInformer, numOfEvictions)
waitToObservePods(t, informers.Core().V1().Pods().Informer(), numOfEvictions)
pdb := newPDB()
if _, err := clientSet.Policy().PodDisruptionBudgets(ns.Name).Create(pdb); err != nil {
@ -227,7 +227,7 @@ func newEviction(ns, evictionName string, deleteOption *metav1.DeleteOptions) *v
}
}
func rmSetup(t *testing.T) (*httptest.Server, *disruption.DisruptionController, cache.SharedIndexInformer, clientset.Interface) {
func rmSetup(t *testing.T) (*httptest.Server, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface) {
masterConfig := framework.NewIntegrationTestMasterConfig()
_, s := framework.RunAMaster(masterConfig)
@ -237,13 +237,18 @@ func rmSetup(t *testing.T) (*httptest.Server, *disruption.DisruptionController,
t.Fatalf("Error in create clientset: %v", err)
}
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pdb-informers")), nil, resyncPeriod)
informers := informers.NewSharedInformerFactory(nil, clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pdb-informers")), resyncPeriod)
rm := disruption.NewDisruptionController(
informers.Pods().Informer(),
informers.Core().V1().Pods(),
informers.Policy().V1beta1().PodDisruptionBudgets(),
informers.Core().V1().ReplicationControllers(),
informers.Extensions().V1beta1().ReplicaSets(),
informers.Extensions().V1beta1().Deployments(),
informers.Apps().V1beta1().StatefulSets(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "disruption-controller")),
)
return s, rm, informers.Pods().Informer(), clientSet
return s, rm, informers, clientSet
}
// wait for the podInformer to observe the pods. Call this function before