Migrate /pkg/controller/disruption to structured and contextual logging

This commit is contained in:
Mengjiao Liu 2023-07-12 11:30:45 +08:00
parent 98e7c2a751
commit 19869478c1
7 changed files with 201 additions and 164 deletions

View File

@ -38,6 +38,7 @@ func startDisruptionController(ctx context.Context, controllerContext Controller
} }
go disruption.NewDisruptionController( go disruption.NewDisruptionController(
ctx,
controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Policy().V1().PodDisruptionBudgets(), controllerContext.InformerFactory.Policy().V1().PodDisruptionBudgets(),
controllerContext.InformerFactory.Core().V1().ReplicationControllers(), controllerContext.InformerFactory.Core().V1().ReplicationControllers(),

View File

@ -38,7 +38,6 @@ contextual k8s.io/kubernetes/test/e2e/dra/.*
# this point it is easier to list the exceptions. # this point it is easier to list the exceptions.
-contextual k8s.io/kubernetes/pkg/controller/controller_ref_manager.go -contextual k8s.io/kubernetes/pkg/controller/controller_ref_manager.go
-contextual k8s.io/kubernetes/pkg/controller/controller_utils.go -contextual k8s.io/kubernetes/pkg/controller/controller_utils.go
-contextual k8s.io/kubernetes/pkg/controller/disruption/.*
-contextual k8s.io/kubernetes/pkg/controller/endpoint/.* -contextual k8s.io/kubernetes/pkg/controller/endpoint/.*
-contextual k8s.io/kubernetes/pkg/controller/endpointslicemirroring/.* -contextual k8s.io/kubernetes/pkg/controller/endpointslicemirroring/.*
-contextual k8s.io/kubernetes/pkg/controller/garbagecollector/.* -contextual k8s.io/kubernetes/pkg/controller/garbagecollector/.*

View File

@ -134,6 +134,7 @@ type controllerAndScale struct {
type podControllerFinder func(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) type podControllerFinder func(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error)
func NewDisruptionController( func NewDisruptionController(
ctx context.Context,
podInformer coreinformers.PodInformer, podInformer coreinformers.PodInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer, pdbInformer policyinformers.PodDisruptionBudgetInformer,
rcInformer coreinformers.ReplicationControllerInformer, rcInformer coreinformers.ReplicationControllerInformer,
@ -146,6 +147,7 @@ func NewDisruptionController(
discoveryClient discovery.DiscoveryInterface, discoveryClient discovery.DiscoveryInterface,
) *DisruptionController { ) *DisruptionController {
return NewDisruptionControllerInternal( return NewDisruptionControllerInternal(
ctx,
podInformer, podInformer,
pdbInformer, pdbInformer,
rcInformer, rcInformer,
@ -163,7 +165,7 @@ func NewDisruptionController(
// NewDisruptionControllerInternal allows to set a clock and // NewDisruptionControllerInternal allows to set a clock and
// stalePodDisruptionTimeout // stalePodDisruptionTimeout
// It is only supposed to be used by tests. // It is only supposed to be used by tests.
func NewDisruptionControllerInternal( func NewDisruptionControllerInternal(ctx context.Context,
podInformer coreinformers.PodInformer, podInformer coreinformers.PodInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer, pdbInformer policyinformers.PodDisruptionBudgetInformer,
rcInformer coreinformers.ReplicationControllerInformer, rcInformer coreinformers.ReplicationControllerInformer,
@ -177,6 +179,7 @@ func NewDisruptionControllerInternal(
clock clock.WithTicker, clock clock.WithTicker,
stalePodDisruptionTimeout time.Duration, stalePodDisruptionTimeout time.Duration,
) *DisruptionController { ) *DisruptionController {
logger := klog.FromContext(ctx)
dc := &DisruptionController{ dc := &DisruptionController{
kubeClient: kubeClient, kubeClient: kubeClient,
queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "disruption"), workqueue.DefaultControllerRateLimiter()), queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "disruption"), workqueue.DefaultControllerRateLimiter()),
@ -190,20 +193,30 @@ func NewDisruptionControllerInternal(
dc.getUpdater = func() updater { return dc.writePdbStatus } dc.getUpdater = func() updater { return dc.writePdbStatus }
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addPod, AddFunc: func(obj interface{}) {
UpdateFunc: dc.updatePod, dc.addPod(logger, obj)
DeleteFunc: dc.deletePod, },
UpdateFunc: func(oldObj, newObj interface{}) {
dc.updatePod(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
dc.deletePod(logger, obj)
},
}) })
dc.podLister = podInformer.Lister() dc.podLister = podInformer.Lister()
dc.podListerSynced = podInformer.Informer().HasSynced dc.podListerSynced = podInformer.Informer().HasSynced
pdbInformer.Informer().AddEventHandler( pdbInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) {
AddFunc: dc.addDb, dc.addDB(logger, obj)
UpdateFunc: dc.updateDb,
DeleteFunc: dc.removeDb,
}, },
) UpdateFunc: func(oldObj, newObj interface{}) {
dc.updateDB(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
dc.removeDB(logger, obj)
},
})
dc.pdbLister = pdbInformer.Lister() dc.pdbLister = pdbInformer.Lister()
dc.pdbListerSynced = pdbInformer.Informer().HasSynced dc.pdbListerSynced = pdbInformer.Informer().HasSynced
@ -418,12 +431,13 @@ func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string,
func (dc *DisruptionController) Run(ctx context.Context) { func (dc *DisruptionController) Run(ctx context.Context) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
logger := klog.FromContext(ctx)
// Start events processing pipeline. // Start events processing pipeline.
if dc.kubeClient != nil { if dc.kubeClient != nil {
klog.Infof("Sending events to api server.") logger.Info("Sending events to api server.")
dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.kubeClient.CoreV1().Events("")}) dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.kubeClient.CoreV1().Events("")})
} else { } else {
klog.Infof("No api server defined - no events will be sent to API server.") logger.Info("No api server defined - no events will be sent to API server.")
} }
defer dc.broadcaster.Shutdown() defer dc.broadcaster.Shutdown()
@ -431,8 +445,8 @@ func (dc *DisruptionController) Run(ctx context.Context) {
defer dc.recheckQueue.ShutDown() defer dc.recheckQueue.ShutDown()
defer dc.stalePodDisruptionQueue.ShutDown() defer dc.stalePodDisruptionQueue.ShutDown()
klog.Infof("Starting disruption controller") logger.Info("Starting disruption controller")
defer klog.Infof("Shutting down disruption controller") defer logger.Info("Shutting down disruption controller")
if !cache.WaitForNamedCacheSync("disruption", ctx.Done(), dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) { if !cache.WaitForNamedCacheSync("disruption", ctx.Done(), dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
return return
@ -445,68 +459,68 @@ func (dc *DisruptionController) Run(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
} }
func (dc *DisruptionController) addDb(obj interface{}) { func (dc *DisruptionController) addDB(logger klog.Logger, obj interface{}) {
pdb := obj.(*policy.PodDisruptionBudget) pdb := obj.(*policy.PodDisruptionBudget)
klog.V(4).Infof("add DB %q", pdb.Name) logger.V(4).Info("Add DB", "podDisruptionBudget", klog.KObj(pdb))
dc.enqueuePdb(pdb) dc.enqueuePdb(logger, pdb)
} }
func (dc *DisruptionController) updateDb(old, cur interface{}) { func (dc *DisruptionController) updateDB(logger klog.Logger, old, cur interface{}) {
// TODO(mml) ignore updates where 'old' is equivalent to 'cur'. // TODO(mml) ignore updates where 'old' is equivalent to 'cur'.
pdb := cur.(*policy.PodDisruptionBudget) pdb := cur.(*policy.PodDisruptionBudget)
klog.V(4).Infof("update DB %q", pdb.Name) logger.V(4).Info("Update DB", "podDisruptionBudget", klog.KObj(pdb))
dc.enqueuePdb(pdb) dc.enqueuePdb(logger, pdb)
} }
func (dc *DisruptionController) removeDb(obj interface{}) { func (dc *DisruptionController) removeDB(logger klog.Logger, obj interface{}) {
pdb, ok := obj.(*policy.PodDisruptionBudget) pdb, ok := obj.(*policy.PodDisruptionBudget)
if !ok { if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown) tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok { if !ok {
klog.Errorf("Couldn't get object from tombstone %+v", obj) logger.Error(nil, "Couldn't get object from tombstone", "obj", obj)
return return
} }
pdb, ok = tombstone.Obj.(*policy.PodDisruptionBudget) pdb, ok = tombstone.Obj.(*policy.PodDisruptionBudget)
if !ok { if !ok {
klog.Errorf("Tombstone contained object that is not a pdb %+v", obj) logger.Error(nil, "Tombstone contained object that is not a PDB", "obj", obj)
return return
} }
} }
klog.V(4).Infof("remove DB %q", pdb.Name) logger.V(4).Info("Remove DB", "podDisruptionBudget", klog.KObj(pdb))
dc.enqueuePdb(pdb) dc.enqueuePdb(logger, pdb)
} }
func (dc *DisruptionController) addPod(obj interface{}) { func (dc *DisruptionController) addPod(logger klog.Logger, obj interface{}) {
pod := obj.(*v1.Pod) pod := obj.(*v1.Pod)
klog.V(4).Infof("addPod called on pod %q", pod.Name) logger.V(4).Info("AddPod called on pod", "pod", klog.KObj(pod))
pdb := dc.getPdbForPod(pod) pdb := dc.getPdbForPod(logger, pod)
if pdb == nil { if pdb == nil {
klog.V(4).Infof("No matching pdb for pod %q", pod.Name) logger.V(4).Info("No matching PDB for pod", "pod", klog.KObj(pod))
} else { } else {
klog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name) logger.V(4).Info("addPod -> PDB", "pod", klog.KObj(pod), "podDisruptionBudget", klog.KObj(pdb))
dc.enqueuePdb(pdb) dc.enqueuePdb(logger, pdb)
} }
if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has { if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has {
dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter) dc.enqueueStalePodDisruptionCleanup(logger, pod, cleanAfter)
} }
} }
func (dc *DisruptionController) updatePod(_, cur interface{}) { func (dc *DisruptionController) updatePod(logger klog.Logger, _, cur interface{}) {
pod := cur.(*v1.Pod) pod := cur.(*v1.Pod)
klog.V(4).Infof("updatePod called on pod %q", pod.Name) logger.V(4).Info("UpdatePod called on pod", "pod", klog.KObj(pod))
pdb := dc.getPdbForPod(pod) pdb := dc.getPdbForPod(logger, pod)
if pdb == nil { if pdb == nil {
klog.V(4).Infof("No matching pdb for pod %q", pod.Name) logger.V(4).Info("No matching PDB for pod", "pod", klog.KObj(pod))
} else { } else {
klog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name) logger.V(4).Info("updatePod -> PDB", "pod", klog.KObj(pod), "podDisruptionBudget", klog.KObj(pdb))
dc.enqueuePdb(pdb) dc.enqueuePdb(logger, pdb)
} }
if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has { if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has {
dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter) dc.enqueueStalePodDisruptionCleanup(logger, pod, cleanAfter)
} }
} }
func (dc *DisruptionController) deletePod(obj interface{}) { func (dc *DisruptionController) deletePod(logger klog.Logger, obj interface{}) {
pod, ok := obj.(*v1.Pod) pod, ok := obj.(*v1.Pod)
// When a delete is dropped, the relist will notice a pod in the store not // When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains // in the list, leading to the insertion of a tombstone object which contains
@ -516,66 +530,66 @@ func (dc *DisruptionController) deletePod(obj interface{}) {
if !ok { if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown) tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok { if !ok {
klog.Errorf("Couldn't get object from tombstone %+v", obj) logger.Error(nil, "Couldn't get object from tombstone", "obj", obj)
return return
} }
pod, ok = tombstone.Obj.(*v1.Pod) pod, ok = tombstone.Obj.(*v1.Pod)
if !ok { if !ok {
klog.Errorf("Tombstone contained object that is not a pod %+v", obj) logger.Error(nil, "Tombstone contained object that is not a pod", "obj", obj)
return return
} }
} }
klog.V(4).Infof("deletePod called on pod %q", pod.Name) logger.V(4).Info("DeletePod called on pod", "pod", klog.KObj(pod))
pdb := dc.getPdbForPod(pod) pdb := dc.getPdbForPod(logger, pod)
if pdb == nil { if pdb == nil {
klog.V(4).Infof("No matching pdb for pod %q", pod.Name) logger.V(4).Info("No matching PDB for pod", "pod", klog.KObj(pod))
return return
} }
klog.V(4).Infof("deletePod %q -> PDB %q", pod.Name, pdb.Name) logger.V(4).Info("DeletePod -> PDB", "pod", klog.KObj(pod), "podDisruptionBudget", klog.KObj(pdb))
dc.enqueuePdb(pdb) dc.enqueuePdb(logger, pdb)
} }
func (dc *DisruptionController) enqueuePdb(pdb *policy.PodDisruptionBudget) { func (dc *DisruptionController) enqueuePdb(logger klog.Logger, pdb *policy.PodDisruptionBudget) {
key, err := controller.KeyFunc(pdb) key, err := controller.KeyFunc(pdb)
if err != nil { if err != nil {
klog.Errorf("Couldn't get key for PodDisruptionBudget object %+v: %v", pdb, err) logger.Error(err, "Couldn't get key for PodDisruptionBudget", "podDisruptionBudget", klog.KObj(pdb))
return return
} }
dc.queue.Add(key) dc.queue.Add(key)
} }
func (dc *DisruptionController) enqueuePdbForRecheck(pdb *policy.PodDisruptionBudget, delay time.Duration) { func (dc *DisruptionController) enqueuePdbForRecheck(logger klog.Logger, pdb *policy.PodDisruptionBudget, delay time.Duration) {
key, err := controller.KeyFunc(pdb) key, err := controller.KeyFunc(pdb)
if err != nil { if err != nil {
klog.Errorf("Couldn't get key for PodDisruptionBudget object %+v: %v", pdb, err) logger.Error(err, "Couldn't get key for PodDisruptionBudget", "podDisruptionBudget", klog.KObj(pdb))
return return
} }
dc.recheckQueue.AddAfter(key, delay) dc.recheckQueue.AddAfter(key, delay)
} }
func (dc *DisruptionController) enqueueStalePodDisruptionCleanup(pod *v1.Pod, d time.Duration) { func (dc *DisruptionController) enqueueStalePodDisruptionCleanup(logger klog.Logger, pod *v1.Pod, d time.Duration) {
key, err := controller.KeyFunc(pod) key, err := controller.KeyFunc(pod)
if err != nil { if err != nil {
klog.ErrorS(err, "Couldn't get key for Pod object", "pod", klog.KObj(pod)) logger.Error(err, "Couldn't get key for Pod object", "pod", klog.KObj(pod))
return return
} }
dc.stalePodDisruptionQueue.AddAfter(key, d) dc.stalePodDisruptionQueue.AddAfter(key, d)
klog.V(4).InfoS("Enqueued pod to cleanup stale DisruptionTarget condition", "pod", klog.KObj(pod)) logger.V(4).Info("Enqueued pod to cleanup stale DisruptionTarget condition", "pod", klog.KObj(pod))
} }
func (dc *DisruptionController) getPdbForPod(pod *v1.Pod) *policy.PodDisruptionBudget { func (dc *DisruptionController) getPdbForPod(logger klog.Logger, pod *v1.Pod) *policy.PodDisruptionBudget {
// GetPodPodDisruptionBudgets returns an error only if no // GetPodPodDisruptionBudgets returns an error only if no
// PodDisruptionBudgets are found. We don't return that as an error to the // PodDisruptionBudgets are found. We don't return that as an error to the
// caller. // caller.
pdbs, err := dc.pdbLister.GetPodPodDisruptionBudgets(pod) pdbs, err := dc.pdbLister.GetPodPodDisruptionBudgets(pod)
if err != nil { if err != nil {
klog.V(4).Infof("No PodDisruptionBudgets found for pod %v, PodDisruptionBudget controller will avoid syncing.", pod.Name) logger.V(4).Info("No PodDisruptionBudgets found for pod, PodDisruptionBudget controller will avoid syncing.", "pod", klog.KObj(pod))
return nil return nil
} }
if len(pdbs) > 1 { if len(pdbs) > 1 {
msg := fmt.Sprintf("Pod %q/%q matches multiple PodDisruptionBudgets. Chose %q arbitrarily.", pod.Namespace, pod.Name, pdbs[0].Name) msg := fmt.Sprintf("Pod %q/%q matches multiple PodDisruptionBudgets. Chose %q arbitrarily.", pod.Namespace, pod.Name, pdbs[0].Name)
klog.Warning(msg) logger.Info(msg)
dc.recorder.Event(pod, v1.EventTypeWarning, "MultiplePodDisruptionBudgets", msg) dc.recorder.Event(pod, v1.EventTypeWarning, "MultiplePodDisruptionBudgets", msg)
} }
return pdbs[0] return pdbs[0]
@ -656,9 +670,10 @@ func (dc *DisruptionController) processNextStalePodDisruptionWorkItem(ctx contex
} }
func (dc *DisruptionController) sync(ctx context.Context, key string) error { func (dc *DisruptionController) sync(ctx context.Context, key string) error {
logger := klog.FromContext(ctx)
startTime := dc.clock.Now() startTime := dc.clock.Now()
defer func() { defer func() {
klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, dc.clock.Since(startTime)) logger.V(4).Info("Finished syncing PodDisruptionBudget", "key", key, "duration", dc.clock.Since(startTime))
}() }()
namespace, name, err := cache.SplitMetaNamespaceKey(key) namespace, name, err := cache.SplitMetaNamespaceKey(key)
@ -667,7 +682,7 @@ func (dc *DisruptionController) sync(ctx context.Context, key string) error {
} }
pdb, err := dc.pdbLister.PodDisruptionBudgets(namespace).Get(name) pdb, err := dc.pdbLister.PodDisruptionBudgets(namespace).Get(name)
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
klog.V(4).Infof("PodDisruptionBudget %q has been deleted", key) logger.V(4).Info("podDisruptionBudget has been deleted", "key", key)
return nil return nil
} }
if err != nil { if err != nil {
@ -681,7 +696,7 @@ func (dc *DisruptionController) sync(ctx context.Context, key string) error {
return err return err
} }
if err != nil { if err != nil {
klog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err) logger.Error(err, "Failed to sync PDB", "podDisruptionBudget", klog.KRef(namespace, name))
return dc.failSafe(ctx, pdb, err) return dc.failSafe(ctx, pdb, err)
} }
@ -689,6 +704,7 @@ func (dc *DisruptionController) sync(ctx context.Context, key string) error {
} }
func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisruptionBudget) error { func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisruptionBudget) error {
logger := klog.FromContext(ctx)
pods, err := dc.getPodsForPdb(pdb) pods, err := dc.getPodsForPdb(pdb)
if err != nil { if err != nil {
dc.recorder.Eventf(pdb, v1.EventTypeWarning, "NoPods", "Failed to get pods: %v", err) dc.recorder.Eventf(pdb, v1.EventTypeWarning, "NoPods", "Failed to get pods: %v", err)
@ -705,7 +721,7 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr
} }
// We have unmamanged pods, instead of erroring and hotlooping in disruption controller, log and continue. // We have unmamanged pods, instead of erroring and hotlooping in disruption controller, log and continue.
if len(unmanagedPods) > 0 { if len(unmanagedPods) > 0 {
klog.Warningf("found unmanaged pods associated with this PDB: %v", unmanagedPods) logger.V(4).Info("Found unmanaged pods associated with this PDB", "pods", unmanagedPods)
dc.recorder.Eventf(pdb, v1.EventTypeWarning, "UnmanagedPods", "Pods selected by this PodDisruptionBudget (selector: %v) were found "+ dc.recorder.Eventf(pdb, v1.EventTypeWarning, "UnmanagedPods", "Pods selected by this PodDisruptionBudget (selector: %v) were found "+
"to be unmanaged. As a result, the status of the PDB cannot be calculated correctly, which may result in undefined behavior. "+ "to be unmanaged. As a result, the status of the PDB cannot be calculated correctly, which may result in undefined behavior. "+
"To account for these pods please set \".spec.minAvailable\" "+ "To account for these pods please set \".spec.minAvailable\" "+
@ -713,7 +729,7 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr
} }
currentTime := dc.clock.Now() currentTime := dc.clock.Now()
disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime) disruptedPods, recheckTime := dc.buildDisruptedPodMap(logger, pods, pdb, currentTime)
currentHealthy := countHealthyPods(pods, disruptedPods, currentTime) currentHealthy := countHealthyPods(pods, disruptedPods, currentTime)
err = dc.updatePdbStatus(ctx, pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods) err = dc.updatePdbStatus(ctx, pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods)
@ -721,23 +737,24 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr
// There is always at most one PDB waiting with a particular name in the queue, // There is always at most one PDB waiting with a particular name in the queue,
// and each PDB in the queue is associated with the lowest timestamp // and each PDB in the queue is associated with the lowest timestamp
// that was supplied when a PDB with that name was added. // that was supplied when a PDB with that name was added.
dc.enqueuePdbForRecheck(pdb, recheckTime.Sub(currentTime)) dc.enqueuePdbForRecheck(logger, pdb, recheckTime.Sub(currentTime))
} }
return err return err
} }
func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key string) error { func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key string) error {
logger := klog.FromContext(ctx)
startTime := dc.clock.Now() startTime := dc.clock.Now()
namespace, name, err := cache.SplitMetaNamespaceKey(key) namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil { if err != nil {
return err return err
} }
defer func() { defer func() {
klog.V(4).InfoS("Finished syncing Pod to clear DisruptionTarget condition", "pod", klog.KRef(namespace, name), "duration", dc.clock.Since(startTime)) logger.V(4).Info("Finished syncing Pod to clear DisruptionTarget condition", "pod", klog.KRef(namespace, name), "duration", dc.clock.Since(startTime))
}() }()
pod, err := dc.podLister.Pods(namespace).Get(name) pod, err := dc.podLister.Pods(namespace).Get(name)
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
klog.V(4).InfoS("Skipping clearing DisruptionTarget condition because pod was deleted", "pod", klog.KObj(pod)) logger.V(4).Info("Skipping clearing DisruptionTarget condition because pod was deleted", "pod", klog.KObj(pod))
return nil return nil
} }
if err != nil { if err != nil {
@ -749,7 +766,7 @@ func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key
return nil return nil
} }
if cleanAfter > 0 { if cleanAfter > 0 {
dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter) dc.enqueueStalePodDisruptionCleanup(logger, pod, cleanAfter)
return nil return nil
} }
@ -765,7 +782,7 @@ func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key
if _, err := dc.kubeClient.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil { if _, err := dc.kubeClient.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil {
return err return err
} }
klog.V(2).InfoS("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod)) logger.V(2).Info("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod))
return nil return nil
} }
@ -895,7 +912,7 @@ func countHealthyPods(pods []*v1.Pod, disruptedPods map[string]metav1.Time, curr
// Builds new PodDisruption map, possibly removing items that refer to non-existing, already deleted // Builds new PodDisruption map, possibly removing items that refer to non-existing, already deleted
// or not-deleted at all items. Also returns an information when this check should be repeated. // or not-deleted at all items. Also returns an information when this check should be repeated.
func (dc *DisruptionController) buildDisruptedPodMap(pods []*v1.Pod, pdb *policy.PodDisruptionBudget, currentTime time.Time) (map[string]metav1.Time, *time.Time) { func (dc *DisruptionController) buildDisruptedPodMap(logger klog.Logger, pods []*v1.Pod, pdb *policy.PodDisruptionBudget, currentTime time.Time) (map[string]metav1.Time, *time.Time) {
disruptedPods := pdb.Status.DisruptedPods disruptedPods := pdb.Status.DisruptedPods
result := make(map[string]metav1.Time) result := make(map[string]metav1.Time)
var recheckTime *time.Time var recheckTime *time.Time
@ -915,8 +932,8 @@ func (dc *DisruptionController) buildDisruptedPodMap(pods []*v1.Pod, pdb *policy
} }
expectedDeletion := disruptionTime.Time.Add(DeletionTimeout) expectedDeletion := disruptionTime.Time.Add(DeletionTimeout)
if expectedDeletion.Before(currentTime) { if expectedDeletion.Before(currentTime) {
klog.V(1).Infof("Pod %s/%s was expected to be deleted at %s but it wasn't, updating pdb %s/%s", logger.V(1).Info("pod was expected to be deleted but it wasn't, updating PDB",
pod.Namespace, pod.Name, disruptionTime.String(), pdb.Namespace, pdb.Name) "pod", klog.KObj(pod), "deletionTime", disruptionTime, "podDisruptionBudget", klog.KObj(pdb))
dc.recorder.Eventf(pod, v1.EventTypeWarning, "NotDeleted", "Pod was expected by PDB %s/%s to be deleted but it wasn't", dc.recorder.Eventf(pod, v1.EventTypeWarning, "NotDeleted", "Pod was expected by PDB %s/%s to be deleted but it wasn't",
pdb.Namespace, pdb.Namespace) pdb.Namespace, pdb.Namespace)
} else { } else {

View File

@ -18,9 +18,7 @@ package disruption
import ( import (
"context" "context"
"flag"
"fmt" "fmt"
"os"
"runtime/debug" "runtime/debug"
"strings" "strings"
"sync" "sync"
@ -51,9 +49,9 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
_ "k8s.io/kubernetes/pkg/apis/core/install" _ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/test/utils/ktesting"
clocktesting "k8s.io/utils/clock/testing" clocktesting "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
) )
@ -149,8 +147,8 @@ var customGVK = schema.GroupVersionKind{
Kind: "customresource", Kind: "customresource",
} }
func newFakeDisruptionController() (*disruptionController, *pdbStates) { func newFakeDisruptionController(ctx context.Context) (*disruptionController, *pdbStates) {
return newFakeDisruptionControllerWithTime(context.TODO(), time.Now()) return newFakeDisruptionControllerWithTime(ctx, time.Now())
} }
func newFakeDisruptionControllerWithTime(ctx context.Context, now time.Time) (*disruptionController, *pdbStates) { func newFakeDisruptionControllerWithTime(ctx context.Context, now time.Time) (*disruptionController, *pdbStates) {
@ -168,6 +166,7 @@ func newFakeDisruptionControllerWithTime(ctx context.Context, now time.Time) (*d
fakeClock := clocktesting.NewFakeClock(now) fakeClock := clocktesting.NewFakeClock(now)
dc := NewDisruptionControllerInternal( dc := NewDisruptionControllerInternal(
ctx,
informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Pods(),
informerFactory.Policy().V1().PodDisruptionBudgets(), informerFactory.Policy().V1().PodDisruptionBudgets(),
informerFactory.Core().V1().ReplicationControllers(), informerFactory.Core().V1().ReplicationControllers(),
@ -424,14 +423,14 @@ func add(t *testing.T, store cache.Store, obj interface{}) {
// Create one with no selector. Verify it matches all pods // Create one with no selector. Verify it matches all pods
func TestNoSelector(t *testing.T) { func TestNoSelector(t *testing.T) {
dc, ps := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController(ctx)
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(3)) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(3))
pdb.Spec.Selector = &metav1.LabelSelector{} pdb.Spec.Selector = &metav1.LabelSelector{}
pod, _ := newPod(t, "yo-yo-yo") pod, _ := newPod(t, "yo-yo-yo")
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
ctx := context.TODO()
dc.sync(ctx, pdbName) dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]metav1.Time{})
@ -443,10 +442,10 @@ func TestNoSelector(t *testing.T) {
// Verify that available/expected counts go up as we add pods, then verify that // Verify that available/expected counts go up as we add pods, then verify that
// available count goes down when we make a pod unavailable. // available count goes down when we make a pod unavailable.
func TestUnavailable(t *testing.T) { func TestUnavailable(t *testing.T) {
dc, ps := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController(ctx)
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(3)) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(3))
ctx := context.TODO()
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
dc.sync(ctx, pdbName) dc.sync(ctx, pdbName)
@ -473,11 +472,11 @@ func TestUnavailable(t *testing.T) {
// Verify that an integer MaxUnavailable won't // Verify that an integer MaxUnavailable won't
// allow a disruption for pods with no controller. // allow a disruption for pods with no controller.
func TestIntegerMaxUnavailable(t *testing.T) { func TestIntegerMaxUnavailable(t *testing.T) {
dc, ps := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController(ctx)
pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromInt32(1)) pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromInt32(1))
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
ctx := context.TODO()
dc.sync(ctx, pdbName) dc.sync(ctx, pdbName)
// This verifies that when a PDB has 0 pods, disruptions are not allowed. // This verifies that when a PDB has 0 pods, disruptions are not allowed.
ps.VerifyDisruptionAllowed(t, pdbName, 0) ps.VerifyDisruptionAllowed(t, pdbName, 0)
@ -494,7 +493,8 @@ func TestIntegerMaxUnavailable(t *testing.T) {
// Verify that an integer MaxUnavailable will recompute allowed disruptions when the scale of // Verify that an integer MaxUnavailable will recompute allowed disruptions when the scale of
// the selected pod's controller is modified. // the selected pod's controller is modified.
func TestIntegerMaxUnavailableWithScaling(t *testing.T) { func TestIntegerMaxUnavailableWithScaling(t *testing.T) {
dc, ps := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController(ctx)
pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromInt32(2)) pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromInt32(2))
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
@ -504,7 +504,6 @@ func TestIntegerMaxUnavailableWithScaling(t *testing.T) {
pod, _ := newPod(t, "pod") pod, _ := newPod(t, "pod")
updatePodOwnerToRs(t, pod, rs) updatePodOwnerToRs(t, pod, rs)
ctx := context.TODO()
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
dc.sync(ctx, pdbName) dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 1, 5, 7, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 1, 5, 7, map[string]metav1.Time{})
@ -520,7 +519,8 @@ func TestIntegerMaxUnavailableWithScaling(t *testing.T) {
// Verify that an percentage MaxUnavailable will recompute allowed disruptions when the scale of // Verify that an percentage MaxUnavailable will recompute allowed disruptions when the scale of
// the selected pod's controller is modified. // the selected pod's controller is modified.
func TestPercentageMaxUnavailableWithScaling(t *testing.T) { func TestPercentageMaxUnavailableWithScaling(t *testing.T) {
dc, ps := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController(ctx)
pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromString("30%")) pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromString("30%"))
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
@ -531,7 +531,6 @@ func TestPercentageMaxUnavailableWithScaling(t *testing.T) {
pod, _ := newPod(t, "pod") pod, _ := newPod(t, "pod")
updatePodOwnerToRs(t, pod, rs) updatePodOwnerToRs(t, pod, rs)
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
ctx := context.TODO()
dc.sync(ctx, pdbName) dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 1, 4, 7, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 1, 4, 7, map[string]metav1.Time{})
@ -546,11 +545,11 @@ func TestPercentageMaxUnavailableWithScaling(t *testing.T) {
// Create a pod with no controller, and verify that a PDB with a percentage // Create a pod with no controller, and verify that a PDB with a percentage
// specified won't allow a disruption. // specified won't allow a disruption.
func TestNakedPod(t *testing.T) { func TestNakedPod(t *testing.T) {
dc, ps := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController(ctx)
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%")) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%"))
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
ctx := context.TODO()
dc.sync(ctx, pdbName) dc.sync(ctx, pdbName)
// This verifies that when a PDB has 0 pods, disruptions are not allowed. // This verifies that when a PDB has 0 pods, disruptions are not allowed.
ps.VerifyDisruptionAllowed(t, pdbName, 0) ps.VerifyDisruptionAllowed(t, pdbName, 0)
@ -566,11 +565,11 @@ func TestNakedPod(t *testing.T) {
// Create a pod with unsupported controller, and verify that a PDB with a percentage // Create a pod with unsupported controller, and verify that a PDB with a percentage
// specified won't allow a disruption. // specified won't allow a disruption.
func TestUnsupportedControllerPod(t *testing.T) { func TestUnsupportedControllerPod(t *testing.T) {
dc, ps := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController(ctx)
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%")) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%"))
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
ctx := context.TODO()
dc.sync(ctx, pdbName) dc.sync(ctx, pdbName)
// This verifies that when a PDB has 0 pods, disruptions are not allowed. // This verifies that when a PDB has 0 pods, disruptions are not allowed.
ps.VerifyDisruptionAllowed(t, pdbName, 0) ps.VerifyDisruptionAllowed(t, pdbName, 0)
@ -594,11 +593,11 @@ func TestUnsupportedControllerPod(t *testing.T) {
// Verify that disruption controller is not erroring when unmanaged pods are found // Verify that disruption controller is not erroring when unmanaged pods are found
func TestStatusForUnmanagedPod(t *testing.T) { func TestStatusForUnmanagedPod(t *testing.T) {
dc, ps := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController(ctx)
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%")) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%"))
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
ctx := context.TODO()
dc.sync(ctx, pdbName) dc.sync(ctx, pdbName)
// This verifies that when a PDB has 0 pods, disruptions are not allowed. // This verifies that when a PDB has 0 pods, disruptions are not allowed.
ps.VerifyDisruptionAllowed(t, pdbName, 0) ps.VerifyDisruptionAllowed(t, pdbName, 0)
@ -612,11 +611,11 @@ func TestStatusForUnmanagedPod(t *testing.T) {
// Check if the unmanaged pods are correctly collected or not // Check if the unmanaged pods are correctly collected or not
func TestTotalUnmanagedPods(t *testing.T) { func TestTotalUnmanagedPods(t *testing.T) {
dc, ps := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController(ctx)
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%")) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%"))
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
ctx := context.TODO()
dc.sync(ctx, pdbName) dc.sync(ctx, pdbName)
// This verifies that when a PDB has 0 pods, disruptions are not allowed. // This verifies that when a PDB has 0 pods, disruptions are not allowed.
ps.VerifyDisruptionAllowed(t, pdbName, 0) ps.VerifyDisruptionAllowed(t, pdbName, 0)
@ -636,14 +635,14 @@ func TestTotalUnmanagedPods(t *testing.T) {
// Verify that we count the scale of a ReplicaSet even when it has no Deployment. // Verify that we count the scale of a ReplicaSet even when it has no Deployment.
func TestReplicaSet(t *testing.T) { func TestReplicaSet(t *testing.T) {
dc, ps := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController(ctx)
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("20%")) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("20%"))
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
rs, _ := newReplicaSet(t, 10) rs, _ := newReplicaSet(t, 10)
add(t, dc.rsStore, rs) add(t, dc.rsStore, rs)
ctx := context.TODO()
pod, _ := newPod(t, "pod") pod, _ := newPod(t, "pod")
updatePodOwnerToRs(t, pod, rs) updatePodOwnerToRs(t, pod, rs)
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
@ -657,7 +656,8 @@ func TestScaleResource(t *testing.T) {
pods := int32(4) pods := int32(4)
maxUnavailable := int32(5) maxUnavailable := int32(5)
dc, ps := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController(ctx)
dc.scaleClient.AddReactor("get", "customresources", func(action core.Action) (handled bool, ret runtime.Object, err error) { dc.scaleClient.AddReactor("get", "customresources", func(action core.Action) (handled bool, ret runtime.Object, err error) {
obj := &autoscalingapi.Scale{ obj := &autoscalingapi.Scale{
@ -688,7 +688,6 @@ func TestScaleResource(t *testing.T) {
}) })
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
} }
ctx := context.TODO()
dc.sync(ctx, pdbName) dc.sync(ctx, pdbName)
disruptionsAllowed := int32(0) disruptionsAllowed := int32(0)
if replicas-pods < maxUnavailable { if replicas-pods < maxUnavailable {
@ -750,7 +749,8 @@ func TestScaleFinderNoResource(t *testing.T) {
t.Run(tn, func(t *testing.T) { t.Run(tn, func(t *testing.T) {
customResourceUID := uuid.NewUUID() customResourceUID := uuid.NewUUID()
dc, _ := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, _ := newFakeDisruptionController(ctx)
dc.scaleClient.AddReactor("get", resourceName, func(action core.Action) (handled bool, ret runtime.Object, err error) { dc.scaleClient.AddReactor("get", resourceName, func(action core.Action) (handled bool, ret runtime.Object, err error) {
gr := schema.GroupResource{ gr := schema.GroupResource{
@ -774,7 +774,7 @@ func TestScaleFinderNoResource(t *testing.T) {
UID: customResourceUID, UID: customResourceUID,
} }
_, err := dc.getScaleController(context.TODO(), ownerRef, "default") _, err := dc.getScaleController(ctx, ownerRef, "default")
if tc.expectError && err == nil { if tc.expectError && err == nil {
t.Error("expected error, but didn't get one") t.Error("expected error, but didn't get one")
@ -790,8 +790,8 @@ func TestScaleFinderNoResource(t *testing.T) {
// Verify that multiple controllers doesn't allow the PDB to be set true. // Verify that multiple controllers doesn't allow the PDB to be set true.
func TestMultipleControllers(t *testing.T) { func TestMultipleControllers(t *testing.T) {
const podCount = 2 const podCount = 2
_, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController() dc, ps := newFakeDisruptionController(ctx)
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("1%")) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("1%"))
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
@ -802,7 +802,6 @@ func TestMultipleControllers(t *testing.T) {
pods = append(pods, pod) pods = append(pods, pod)
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
} }
ctx := context.TODO()
dc.sync(ctx, pdbName) dc.sync(ctx, pdbName)
// No controllers yet => no disruption allowed // No controllers yet => no disruption allowed
@ -840,8 +839,8 @@ func TestReplicationController(t *testing.T) {
"foo": "bar", "foo": "bar",
"baz": "quux", "baz": "quux",
} }
_, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController() dc, ps := newFakeDisruptionController(ctx)
// 34% should round up to 2 // 34% should round up to 2
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("34%")) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("34%"))
@ -849,7 +848,6 @@ func TestReplicationController(t *testing.T) {
rc, _ := newReplicationController(t, 3) rc, _ := newReplicationController(t, 3)
rc.Spec.Selector = labels rc.Spec.Selector = labels
add(t, dc.rcStore, rc) add(t, dc.rcStore, rc)
ctx := context.TODO()
dc.sync(ctx, pdbName) dc.sync(ctx, pdbName)
// It starts out at 0 expected because, with no pods, the PDB doesn't know // It starts out at 0 expected because, with no pods, the PDB doesn't know
@ -881,14 +879,14 @@ func TestStatefulSetController(t *testing.T) {
"baz": "quux", "baz": "quux",
} }
dc, ps := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController(ctx)
// 34% should round up to 2 // 34% should round up to 2
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("34%")) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("34%"))
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
ss, _ := newStatefulSet(t, 3) ss, _ := newStatefulSet(t, 3)
add(t, dc.ssStore, ss) add(t, dc.ssStore, ss)
ctx := context.TODO()
dc.sync(ctx, pdbName) dc.sync(ctx, pdbName)
// It starts out at 0 expected because, with no pods, the PDB doesn't know // It starts out at 0 expected because, with no pods, the PDB doesn't know
@ -920,7 +918,8 @@ func TestTwoControllers(t *testing.T) {
"foo": "bar", "foo": "bar",
"baz": "quuux", "baz": "quuux",
} }
dc, ps := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController(ctx)
// These constants are related, but I avoid calculating the correct values in // These constants are related, but I avoid calculating the correct values in
// code. If you update a parameter here, recalculate the correct values for // code. If you update a parameter here, recalculate the correct values for
@ -935,7 +934,6 @@ func TestTwoControllers(t *testing.T) {
rc, _ := newReplicationController(t, collectionSize) rc, _ := newReplicationController(t, collectionSize)
rc.Spec.Selector = rcLabels rc.Spec.Selector = rcLabels
add(t, dc.rcStore, rc) add(t, dc.rcStore, rc)
ctx := context.TODO()
dc.sync(ctx, pdbName) dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{})
@ -1019,16 +1017,18 @@ func TestTwoControllers(t *testing.T) {
// Test pdb doesn't exist // Test pdb doesn't exist
func TestPDBNotExist(t *testing.T) { func TestPDBNotExist(t *testing.T) {
dc, _ := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, _ := newFakeDisruptionController(ctx)
pdb, _ := newMinAvailablePodDisruptionBudget(t, intstr.FromString("67%")) pdb, _ := newMinAvailablePodDisruptionBudget(t, intstr.FromString("67%"))
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
if err := dc.sync(context.TODO(), "notExist"); err != nil { if err := dc.sync(ctx, "notExist"); err != nil {
t.Errorf("Unexpected error: %v, expect nil", err) t.Errorf("Unexpected error: %v, expect nil", err)
} }
} }
func TestUpdateDisruptedPods(t *testing.T) { func TestUpdateDisruptedPods(t *testing.T) {
dc, ps := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController(ctx)
dc.recheckQueue = workqueue.NewNamedDelayingQueue("pdb_queue") dc.recheckQueue = workqueue.NewNamedDelayingQueue("pdb_queue")
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(1)) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(1))
currentTime := dc.clock.Now() currentTime := dc.clock.Now()
@ -1049,13 +1049,14 @@ func TestUpdateDisruptedPods(t *testing.T) {
add(t, dc.podStore, pod2) add(t, dc.podStore, pod2)
add(t, dc.podStore, pod3) add(t, dc.podStore, pod3)
dc.sync(context.TODO(), pdbName) dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime.Add(-time.Minute)}}) ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime.Add(-time.Minute)}})
} }
func TestBasicFinderFunctions(t *testing.T) { func TestBasicFinderFunctions(t *testing.T) {
dc, _ := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, _ := newFakeDisruptionController(ctx)
rs, _ := newReplicaSet(t, 10) rs, _ := newReplicaSet(t, 10)
add(t, dc.rsStore, rs) add(t, dc.rsStore, rs)
@ -1135,7 +1136,7 @@ func TestBasicFinderFunctions(t *testing.T) {
UID: tc.uid, UID: tc.uid,
} }
controllerAndScale, _ := tc.finderFunc(context.TODO(), controllerRef, metav1.NamespaceDefault) controllerAndScale, _ := tc.finderFunc(ctx, controllerRef, metav1.NamespaceDefault)
if controllerAndScale == nil { if controllerAndScale == nil {
if tc.findsScale { if tc.findsScale {
@ -1208,7 +1209,8 @@ func TestDeploymentFinderFunction(t *testing.T) {
for tn, tc := range testCases { for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) { t.Run(tn, func(t *testing.T) {
dc, _ := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, _ := newFakeDisruptionController(ctx)
dep, _ := newDeployment(t, 10) dep, _ := newDeployment(t, 10)
dep.Spec.Selector = newSel(labels) dep.Spec.Selector = newSel(labels)
@ -1233,7 +1235,7 @@ func TestDeploymentFinderFunction(t *testing.T) {
UID: rs.UID, UID: rs.UID,
} }
controllerAndScale, _ := dc.getPodDeployment(context.TODO(), controllerRef, metav1.NamespaceDefault) controllerAndScale, _ := dc.getPodDeployment(ctx, controllerRef, metav1.NamespaceDefault)
if controllerAndScale == nil { if controllerAndScale == nil {
if tc.findsScale { if tc.findsScale {
@ -1267,10 +1269,10 @@ func TestDeploymentFinderFunction(t *testing.T) {
// (C) If the DisruptionController writes DisruptionsAllowed=1 despite the // (C) If the DisruptionController writes DisruptionsAllowed=1 despite the
// resource conflict error, then there is a bug. // resource conflict error, then there is a bug.
func TestUpdatePDBStatusRetries(t *testing.T) { func TestUpdatePDBStatusRetries(t *testing.T) {
dc, _ := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, _ := newFakeDisruptionController(ctx)
// Inject the production code over our fake impl // Inject the production code over our fake impl
dc.getUpdater = func() updater { return dc.writePdbStatus } dc.getUpdater = func() updater { return dc.writePdbStatus }
ctx := context.TODO()
// Create a PDB and 3 pods that match it. // Create a PDB and 3 pods that match it.
pdb, pdbKey := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(1)) pdb, pdbKey := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(1))
pdb, err := dc.coreClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).Create(ctx, pdb, metav1.CreateOptions{}) pdb, err := dc.coreClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).Create(ctx, pdb, metav1.CreateOptions{})
@ -1380,8 +1382,6 @@ func TestUpdatePDBStatusRetries(t *testing.T) {
} }
func TestInvalidSelectors(t *testing.T) { func TestInvalidSelectors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testCases := map[string]struct { testCases := map[string]struct {
labelSelector *metav1.LabelSelector labelSelector *metav1.LabelSelector
}{ }{
@ -1407,7 +1407,9 @@ func TestInvalidSelectors(t *testing.T) {
for tn, tc := range testCases { for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) { t.Run(tn, func(t *testing.T) {
dc, ps := newFakeDisruptionController() _, ctx := ktesting.NewTestContext(t)
dc, ps := newFakeDisruptionController(ctx)
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(3)) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(3))
pdb.Spec.Selector = tc.labelSelector pdb.Spec.Selector = tc.labelSelector
@ -1536,7 +1538,8 @@ func TestStalePodDisruption(t *testing.T) {
} }
for name, tc := range cases { for name, tc := range cases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
dc, _ := newFakeDisruptionControllerWithTime(ctx, now) dc, _ := newFakeDisruptionControllerWithTime(ctx, now)
go dc.Run(ctx) go dc.Run(ctx)
@ -1584,9 +1587,3 @@ func verifyEventEmitted(t *testing.T, dc *disruptionController, expectedEvent st
} }
} }
} }
// TestMain adds klog flags to make debugging tests easier.
func TestMain(m *testing.M) {
klog.InitFlags(flag.CommandLine)
os.Exit(m.Run())
}

View File

@ -50,13 +50,14 @@ import (
"k8s.io/kubernetes/test/integration/etcd" "k8s.io/kubernetes/test/integration/etcd"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/integration/util" "k8s.io/kubernetes/test/integration/util"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/clock" "k8s.io/utils/clock"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
) )
const stalePodDisruptionTimeout = 3 * time.Second const stalePodDisruptionTimeout = 3 * time.Second
func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface, *apiextensionsclientset.Clientset, dynamic.Interface) { func setup(ctx context.Context, t *testing.T) (*kubeapiservertesting.TestServer, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface, *apiextensionsclientset.Clientset, dynamic.Interface) {
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins", "ServiceAccount"}, framework.SharedEtcd()) server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins", "ServiceAccount"}, framework.SharedEtcd())
clientSet, err := clientset.NewForConfig(server.ClientConfig) clientSet, err := clientset.NewForConfig(server.ClientConfig)
@ -88,6 +89,7 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti
} }
pdbc := disruption.NewDisruptionControllerInternal( pdbc := disruption.NewDisruptionControllerInternal(
ctx,
informers.Core().V1().Pods(), informers.Core().V1().Pods(),
informers.Policy().V1().PodDisruptionBudgets(), informers.Policy().V1().PodDisruptionBudgets(),
informers.Core().V1().ReplicationControllers(), informers.Core().V1().ReplicationControllers(),
@ -105,11 +107,13 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti
} }
func TestPDBWithScaleSubresource(t *testing.T) { func TestPDBWithScaleSubresource(t *testing.T) {
s, pdbc, informers, clientSet, apiExtensionClient, dynamicClient := setup(t) _, ctx := ktesting.NewTestContext(t)
defer s.TearDownFn() ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(context.Background()) s, pdbc, informers, clientSet, apiExtensionClient, dynamicClient := setup(ctx, t)
defer s.TearDownFn()
defer cancel() defer cancel()
nsName := "pdb-scale-subresource" nsName := "pdb-scale-subresource"
createNs(ctx, t, nsName, clientSet) createNs(ctx, t, nsName, clientSet)
@ -238,10 +242,11 @@ func TestEmptySelector(t *testing.T) {
for i, tc := range testcases { for i, tc := range testcases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
s, pdbc, informers, clientSet, _, _ := setup(t) _, ctx := ktesting.NewTestContext(t)
defer s.TearDownFn() ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(context.Background()) s, pdbc, informers, clientSet, _, _ := setup(ctx, t)
defer s.TearDownFn()
defer cancel() defer cancel()
nsName := fmt.Sprintf("pdb-empty-selector-%d", i) nsName := fmt.Sprintf("pdb-empty-selector-%d", i)
@ -354,10 +359,11 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) {
for i, tc := range testcases { for i, tc := range testcases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
s, pdbc, informers, clientSet, _, _ := setup(t) _, ctx := ktesting.NewTestContext(t)
defer s.TearDownFn() ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(context.Background()) s, pdbc, informers, clientSet, _, _ := setup(ctx, t)
defer s.TearDownFn()
defer cancel() defer cancel()
nsName := fmt.Sprintf("pdb-selectors-%d", i) nsName := fmt.Sprintf("pdb-selectors-%d", i)
@ -508,12 +514,12 @@ func waitToObservePods(t *testing.T, podInformer cache.SharedIndexInformer, podN
} }
func TestPatchCompatibility(t *testing.T) { func TestPatchCompatibility(t *testing.T) {
s, pdbc, _, clientSet, _, _ := setup(t) ctx, cancel := context.WithCancel(context.Background())
defer s.TearDownFn()
s, pdbc, _, clientSet, _, _ := setup(ctx, t)
defer s.TearDownFn()
// Even though pdbc isn't used in this test, its creation is already // Even though pdbc isn't used in this test, its creation is already
// spawning some goroutines. So we need to run it to ensure they won't leak. // spawning some goroutines. So we need to run it to ensure they won't leak.
ctx, cancel := context.WithCancel(context.Background())
cancel() cancel()
pdbc.Run(ctx) pdbc.Run(ctx)
@ -653,10 +659,11 @@ func TestPatchCompatibility(t *testing.T) {
} }
func TestStalePodDisruption(t *testing.T) { func TestStalePodDisruption(t *testing.T) {
s, pdbc, informers, clientSet, _, _ := setup(t) _, ctx := ktesting.NewTestContext(t)
defer s.TearDownFn() ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(context.Background()) s, pdbc, informers, clientSet, _, _ := setup(ctx, t)
defer s.TearDownFn()
defer cancel() defer cancel()
nsName := "pdb-stale-pod-disruption" nsName := "pdb-stale-pod-disruption"

View File

@ -57,6 +57,7 @@ import (
"k8s.io/kubernetes/pkg/controller/disruption" "k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils/ktesting"
) )
const ( const (
@ -68,14 +69,16 @@ const (
func TestConcurrentEvictionRequests(t *testing.T) { func TestConcurrentEvictionRequests(t *testing.T) {
podNameFormat := "test-pod-%d" podNameFormat := "test-pod-%d"
closeFn, rm, informers, _, clientSet := rmSetup(t) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
closeFn, rm, informers, _, clientSet := rmSetup(ctx, t)
defer closeFn() defer closeFn()
ns := framework.CreateNamespaceOrDie(clientSet, "concurrent-eviction-requests", t) ns := framework.CreateNamespaceOrDie(clientSet, "concurrent-eviction-requests", t)
defer framework.DeleteNamespaceOrDie(clientSet, ns, t) defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
informers.Start(ctx.Done()) informers.Start(ctx.Done())
go rm.Run(ctx) go rm.Run(ctx)
@ -181,14 +184,16 @@ func TestConcurrentEvictionRequests(t *testing.T) {
// TestTerminalPodEviction ensures that PDB is not checked for terminal pods. // TestTerminalPodEviction ensures that PDB is not checked for terminal pods.
func TestTerminalPodEviction(t *testing.T) { func TestTerminalPodEviction(t *testing.T) {
closeFn, rm, informers, _, clientSet := rmSetup(t) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
closeFn, rm, informers, _, clientSet := rmSetup(ctx, t)
defer closeFn() defer closeFn()
ns := framework.CreateNamespaceOrDie(clientSet, "terminalpod-eviction", t) ns := framework.CreateNamespaceOrDie(clientSet, "terminalpod-eviction", t)
defer framework.DeleteNamespaceOrDie(clientSet, ns, t) defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
informers.Start(ctx.Done()) informers.Start(ctx.Done())
go rm.Run(ctx) go rm.Run(ctx)
@ -255,11 +260,13 @@ func TestTerminalPodEviction(t *testing.T) {
// TestEvictionVersions ensures the eviction endpoint accepts and returns the correct API versions // TestEvictionVersions ensures the eviction endpoint accepts and returns the correct API versions
func TestEvictionVersions(t *testing.T) { func TestEvictionVersions(t *testing.T) {
closeFn, rm, informers, config, clientSet := rmSetup(t) _, ctx := ktesting.NewTestContext(t)
defer closeFn() ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(context.Background()) closeFn, rm, informers, config, clientSet := rmSetup(ctx, t)
defer closeFn()
defer cancel() defer cancel()
informers.Start(ctx.Done()) informers.Start(ctx.Done())
go rm.Run(ctx) go rm.Run(ctx)
@ -379,15 +386,17 @@ func TestEvictionWithFinalizers(t *testing.T) {
} }
for name, tc := range cases { for name, tc := range cases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
closeFn, rm, informers, _, clientSet := rmSetup(t) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
closeFn, rm, informers, _, clientSet := rmSetup(ctx, t)
defer closeFn() defer closeFn()
ns := framework.CreateNamespaceOrDie(clientSet, "eviction-with-finalizers", t) ns := framework.CreateNamespaceOrDie(clientSet, "eviction-with-finalizers", t)
defer framework.DeleteNamespaceOrDie(clientSet, ns, t) defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)()
ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
informers.Start(ctx.Done()) informers.Start(ctx.Done())
go rm.Run(ctx) go rm.Run(ctx)
@ -465,15 +474,17 @@ func TestEvictionWithUnhealthyPodEvictionPolicy(t *testing.T) {
} }
for name, tc := range cases { for name, tc := range cases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PDBUnhealthyPodEvictionPolicy, tc.enableUnhealthyPodEvictionPolicy)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PDBUnhealthyPodEvictionPolicy, tc.enableUnhealthyPodEvictionPolicy)()
closeFn, rm, informers, _, clientSet := rmSetup(t) closeFn, rm, informers, _, clientSet := rmSetup(ctx, t)
defer closeFn() defer closeFn()
ns := framework.CreateNamespaceOrDie(clientSet, "eviction-with-pdb-pod-healthy-policy", t) ns := framework.CreateNamespaceOrDie(clientSet, "eviction-with-pdb-pod-healthy-policy", t)
defer framework.DeleteNamespaceOrDie(clientSet, ns, t) defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
informers.Start(ctx.Done()) informers.Start(ctx.Done())
go rm.Run(ctx) go rm.Run(ctx)
@ -559,13 +570,15 @@ func TestEvictionWithPrecondition(t *testing.T) {
} }
for name, tc := range cases { for name, tc := range cases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
closeFn, rm, informers, _, clientSet := rmSetup(t) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
closeFn, rm, informers, _, clientSet := rmSetup(ctx, t)
defer closeFn() defer closeFn()
ns := framework.CreateNamespaceOrDie(clientSet, "eviction-with-preconditions", t) ns := framework.CreateNamespaceOrDie(clientSet, "eviction-with-preconditions", t)
defer framework.DeleteNamespaceOrDie(clientSet, ns, t) defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
informers.Start(ctx.Done()) informers.Start(ctx.Done())
go rm.Run(ctx) go rm.Run(ctx)
@ -685,7 +698,7 @@ func newV1Eviction(ns, evictionName string, deleteOption metav1.DeleteOptions) *
} }
} }
func rmSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *disruption.DisruptionController, informers.SharedInformerFactory, *restclient.Config, clientset.Interface) { func rmSetup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc, *disruption.DisruptionController, informers.SharedInformerFactory, *restclient.Config, clientset.Interface) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
@ -709,6 +722,7 @@ func rmSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *disruption.Disru
} }
rm := disruption.NewDisruptionController( rm := disruption.NewDisruptionController(
ctx,
informers.Core().V1().Pods(), informers.Core().V1().Pods(),
informers.Policy().V1().PodDisruptionBudgets(), informers.Policy().V1().PodDisruptionBudgets(),
informers.Core().V1().ReplicationControllers(), informers.Core().V1().ReplicationControllers(),

View File

@ -605,6 +605,7 @@ func PodScheduled(c clientset.Interface, podNamespace, podName string) wait.Cond
// InitDisruptionController initializes and runs a Disruption Controller to properly // InitDisruptionController initializes and runs a Disruption Controller to properly
// update PodDisuptionBudget objects. // update PodDisuptionBudget objects.
func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.DisruptionController { func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.DisruptionController {
_, ctx := ktesting.NewTestContext(t)
informers := informers.NewSharedInformerFactory(testCtx.ClientSet, 12*time.Hour) informers := informers.NewSharedInformerFactory(testCtx.ClientSet, 12*time.Hour)
discoveryClient := cacheddiscovery.NewMemCacheClient(testCtx.ClientSet.Discovery()) discoveryClient := cacheddiscovery.NewMemCacheClient(testCtx.ClientSet.Discovery())
@ -618,6 +619,7 @@ func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.Di
} }
dc := disruption.NewDisruptionController( dc := disruption.NewDisruptionController(
ctx,
informers.Core().V1().Pods(), informers.Core().V1().Pods(),
informers.Policy().V1().PodDisruptionBudgets(), informers.Policy().V1().PodDisruptionBudgets(),
informers.Core().V1().ReplicationControllers(), informers.Core().V1().ReplicationControllers(),