mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-11 13:02:14 +00:00
Merge pull request #119147 from mengjiao-liu/contextual-logging-controller-disruption
Migrate /pkg/controller/disruption to structured and contextual logging
This commit is contained in:
commit
745cfa35bd
@ -38,6 +38,7 @@ func startDisruptionController(ctx context.Context, controllerContext Controller
|
||||
}
|
||||
|
||||
go disruption.NewDisruptionController(
|
||||
ctx,
|
||||
controllerContext.InformerFactory.Core().V1().Pods(),
|
||||
controllerContext.InformerFactory.Policy().V1().PodDisruptionBudgets(),
|
||||
controllerContext.InformerFactory.Core().V1().ReplicationControllers(),
|
||||
|
@ -38,7 +38,6 @@ contextual k8s.io/kubernetes/test/e2e/dra/.*
|
||||
# this point it is easier to list the exceptions.
|
||||
-contextual k8s.io/kubernetes/pkg/controller/controller_ref_manager.go
|
||||
-contextual k8s.io/kubernetes/pkg/controller/controller_utils.go
|
||||
-contextual k8s.io/kubernetes/pkg/controller/disruption/.*
|
||||
-contextual k8s.io/kubernetes/pkg/controller/endpoint/.*
|
||||
-contextual k8s.io/kubernetes/pkg/controller/endpointslicemirroring/.*
|
||||
-contextual k8s.io/kubernetes/pkg/controller/garbagecollector/.*
|
||||
|
@ -134,6 +134,7 @@ type controllerAndScale struct {
|
||||
type podControllerFinder func(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error)
|
||||
|
||||
func NewDisruptionController(
|
||||
ctx context.Context,
|
||||
podInformer coreinformers.PodInformer,
|
||||
pdbInformer policyinformers.PodDisruptionBudgetInformer,
|
||||
rcInformer coreinformers.ReplicationControllerInformer,
|
||||
@ -146,6 +147,7 @@ func NewDisruptionController(
|
||||
discoveryClient discovery.DiscoveryInterface,
|
||||
) *DisruptionController {
|
||||
return NewDisruptionControllerInternal(
|
||||
ctx,
|
||||
podInformer,
|
||||
pdbInformer,
|
||||
rcInformer,
|
||||
@ -163,7 +165,7 @@ func NewDisruptionController(
|
||||
// NewDisruptionControllerInternal allows to set a clock and
|
||||
// stalePodDisruptionTimeout
|
||||
// It is only supposed to be used by tests.
|
||||
func NewDisruptionControllerInternal(
|
||||
func NewDisruptionControllerInternal(ctx context.Context,
|
||||
podInformer coreinformers.PodInformer,
|
||||
pdbInformer policyinformers.PodDisruptionBudgetInformer,
|
||||
rcInformer coreinformers.ReplicationControllerInformer,
|
||||
@ -177,6 +179,7 @@ func NewDisruptionControllerInternal(
|
||||
clock clock.WithTicker,
|
||||
stalePodDisruptionTimeout time.Duration,
|
||||
) *DisruptionController {
|
||||
logger := klog.FromContext(ctx)
|
||||
dc := &DisruptionController{
|
||||
kubeClient: kubeClient,
|
||||
queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "disruption"), workqueue.DefaultControllerRateLimiter()),
|
||||
@ -190,20 +193,30 @@ func NewDisruptionControllerInternal(
|
||||
dc.getUpdater = func() updater { return dc.writePdbStatus }
|
||||
|
||||
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: dc.addPod,
|
||||
UpdateFunc: dc.updatePod,
|
||||
DeleteFunc: dc.deletePod,
|
||||
AddFunc: func(obj interface{}) {
|
||||
dc.addPod(logger, obj)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
dc.updatePod(logger, oldObj, newObj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
dc.deletePod(logger, obj)
|
||||
},
|
||||
})
|
||||
dc.podLister = podInformer.Lister()
|
||||
dc.podListerSynced = podInformer.Informer().HasSynced
|
||||
|
||||
pdbInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: dc.addDb,
|
||||
UpdateFunc: dc.updateDb,
|
||||
DeleteFunc: dc.removeDb,
|
||||
pdbInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
dc.addDB(logger, obj)
|
||||
},
|
||||
)
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
dc.updateDB(logger, oldObj, newObj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
dc.removeDB(logger, obj)
|
||||
},
|
||||
})
|
||||
dc.pdbLister = pdbInformer.Lister()
|
||||
dc.pdbListerSynced = pdbInformer.Informer().HasSynced
|
||||
|
||||
@ -418,12 +431,13 @@ func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string,
|
||||
func (dc *DisruptionController) Run(ctx context.Context) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
// Start events processing pipeline.
|
||||
if dc.kubeClient != nil {
|
||||
klog.Infof("Sending events to api server.")
|
||||
logger.Info("Sending events to api server.")
|
||||
dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.kubeClient.CoreV1().Events("")})
|
||||
} else {
|
||||
klog.Infof("No api server defined - no events will be sent to API server.")
|
||||
logger.Info("No api server defined - no events will be sent to API server.")
|
||||
}
|
||||
defer dc.broadcaster.Shutdown()
|
||||
|
||||
@ -431,8 +445,8 @@ func (dc *DisruptionController) Run(ctx context.Context) {
|
||||
defer dc.recheckQueue.ShutDown()
|
||||
defer dc.stalePodDisruptionQueue.ShutDown()
|
||||
|
||||
klog.Infof("Starting disruption controller")
|
||||
defer klog.Infof("Shutting down disruption controller")
|
||||
logger.Info("Starting disruption controller")
|
||||
defer logger.Info("Shutting down disruption controller")
|
||||
|
||||
if !cache.WaitForNamedCacheSync("disruption", ctx.Done(), dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
|
||||
return
|
||||
@ -445,68 +459,68 @@ func (dc *DisruptionController) Run(ctx context.Context) {
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func (dc *DisruptionController) addDb(obj interface{}) {
|
||||
func (dc *DisruptionController) addDB(logger klog.Logger, obj interface{}) {
|
||||
pdb := obj.(*policy.PodDisruptionBudget)
|
||||
klog.V(4).Infof("add DB %q", pdb.Name)
|
||||
dc.enqueuePdb(pdb)
|
||||
logger.V(4).Info("Add DB", "podDisruptionBudget", klog.KObj(pdb))
|
||||
dc.enqueuePdb(logger, pdb)
|
||||
}
|
||||
|
||||
func (dc *DisruptionController) updateDb(old, cur interface{}) {
|
||||
func (dc *DisruptionController) updateDB(logger klog.Logger, old, cur interface{}) {
|
||||
// TODO(mml) ignore updates where 'old' is equivalent to 'cur'.
|
||||
pdb := cur.(*policy.PodDisruptionBudget)
|
||||
klog.V(4).Infof("update DB %q", pdb.Name)
|
||||
dc.enqueuePdb(pdb)
|
||||
logger.V(4).Info("Update DB", "podDisruptionBudget", klog.KObj(pdb))
|
||||
dc.enqueuePdb(logger, pdb)
|
||||
}
|
||||
|
||||
func (dc *DisruptionController) removeDb(obj interface{}) {
|
||||
func (dc *DisruptionController) removeDB(logger klog.Logger, obj interface{}) {
|
||||
pdb, ok := obj.(*policy.PodDisruptionBudget)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
klog.Errorf("Couldn't get object from tombstone %+v", obj)
|
||||
logger.Error(nil, "Couldn't get object from tombstone", "obj", obj)
|
||||
return
|
||||
}
|
||||
pdb, ok = tombstone.Obj.(*policy.PodDisruptionBudget)
|
||||
if !ok {
|
||||
klog.Errorf("Tombstone contained object that is not a pdb %+v", obj)
|
||||
logger.Error(nil, "Tombstone contained object that is not a PDB", "obj", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
klog.V(4).Infof("remove DB %q", pdb.Name)
|
||||
dc.enqueuePdb(pdb)
|
||||
logger.V(4).Info("Remove DB", "podDisruptionBudget", klog.KObj(pdb))
|
||||
dc.enqueuePdb(logger, pdb)
|
||||
}
|
||||
|
||||
func (dc *DisruptionController) addPod(obj interface{}) {
|
||||
func (dc *DisruptionController) addPod(logger klog.Logger, obj interface{}) {
|
||||
pod := obj.(*v1.Pod)
|
||||
klog.V(4).Infof("addPod called on pod %q", pod.Name)
|
||||
pdb := dc.getPdbForPod(pod)
|
||||
logger.V(4).Info("AddPod called on pod", "pod", klog.KObj(pod))
|
||||
pdb := dc.getPdbForPod(logger, pod)
|
||||
if pdb == nil {
|
||||
klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
|
||||
logger.V(4).Info("No matching PDB for pod", "pod", klog.KObj(pod))
|
||||
} else {
|
||||
klog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name)
|
||||
dc.enqueuePdb(pdb)
|
||||
logger.V(4).Info("addPod -> PDB", "pod", klog.KObj(pod), "podDisruptionBudget", klog.KObj(pdb))
|
||||
dc.enqueuePdb(logger, pdb)
|
||||
}
|
||||
if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has {
|
||||
dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter)
|
||||
dc.enqueueStalePodDisruptionCleanup(logger, pod, cleanAfter)
|
||||
}
|
||||
}
|
||||
|
||||
func (dc *DisruptionController) updatePod(_, cur interface{}) {
|
||||
func (dc *DisruptionController) updatePod(logger klog.Logger, _, cur interface{}) {
|
||||
pod := cur.(*v1.Pod)
|
||||
klog.V(4).Infof("updatePod called on pod %q", pod.Name)
|
||||
pdb := dc.getPdbForPod(pod)
|
||||
logger.V(4).Info("UpdatePod called on pod", "pod", klog.KObj(pod))
|
||||
pdb := dc.getPdbForPod(logger, pod)
|
||||
if pdb == nil {
|
||||
klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
|
||||
logger.V(4).Info("No matching PDB for pod", "pod", klog.KObj(pod))
|
||||
} else {
|
||||
klog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name)
|
||||
dc.enqueuePdb(pdb)
|
||||
logger.V(4).Info("updatePod -> PDB", "pod", klog.KObj(pod), "podDisruptionBudget", klog.KObj(pdb))
|
||||
dc.enqueuePdb(logger, pdb)
|
||||
}
|
||||
if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has {
|
||||
dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter)
|
||||
dc.enqueueStalePodDisruptionCleanup(logger, pod, cleanAfter)
|
||||
}
|
||||
}
|
||||
|
||||
func (dc *DisruptionController) deletePod(obj interface{}) {
|
||||
func (dc *DisruptionController) deletePod(logger klog.Logger, obj interface{}) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
// When a delete is dropped, the relist will notice a pod in the store not
|
||||
// in the list, leading to the insertion of a tombstone object which contains
|
||||
@ -516,66 +530,66 @@ func (dc *DisruptionController) deletePod(obj interface{}) {
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
klog.Errorf("Couldn't get object from tombstone %+v", obj)
|
||||
logger.Error(nil, "Couldn't get object from tombstone", "obj", obj)
|
||||
return
|
||||
}
|
||||
pod, ok = tombstone.Obj.(*v1.Pod)
|
||||
if !ok {
|
||||
klog.Errorf("Tombstone contained object that is not a pod %+v", obj)
|
||||
logger.Error(nil, "Tombstone contained object that is not a pod", "obj", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
klog.V(4).Infof("deletePod called on pod %q", pod.Name)
|
||||
pdb := dc.getPdbForPod(pod)
|
||||
logger.V(4).Info("DeletePod called on pod", "pod", klog.KObj(pod))
|
||||
pdb := dc.getPdbForPod(logger, pod)
|
||||
if pdb == nil {
|
||||
klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
|
||||
logger.V(4).Info("No matching PDB for pod", "pod", klog.KObj(pod))
|
||||
return
|
||||
}
|
||||
klog.V(4).Infof("deletePod %q -> PDB %q", pod.Name, pdb.Name)
|
||||
dc.enqueuePdb(pdb)
|
||||
logger.V(4).Info("DeletePod -> PDB", "pod", klog.KObj(pod), "podDisruptionBudget", klog.KObj(pdb))
|
||||
dc.enqueuePdb(logger, pdb)
|
||||
}
|
||||
|
||||
func (dc *DisruptionController) enqueuePdb(pdb *policy.PodDisruptionBudget) {
|
||||
func (dc *DisruptionController) enqueuePdb(logger klog.Logger, pdb *policy.PodDisruptionBudget) {
|
||||
key, err := controller.KeyFunc(pdb)
|
||||
if err != nil {
|
||||
klog.Errorf("Couldn't get key for PodDisruptionBudget object %+v: %v", pdb, err)
|
||||
logger.Error(err, "Couldn't get key for PodDisruptionBudget", "podDisruptionBudget", klog.KObj(pdb))
|
||||
return
|
||||
}
|
||||
dc.queue.Add(key)
|
||||
}
|
||||
|
||||
func (dc *DisruptionController) enqueuePdbForRecheck(pdb *policy.PodDisruptionBudget, delay time.Duration) {
|
||||
func (dc *DisruptionController) enqueuePdbForRecheck(logger klog.Logger, pdb *policy.PodDisruptionBudget, delay time.Duration) {
|
||||
key, err := controller.KeyFunc(pdb)
|
||||
if err != nil {
|
||||
klog.Errorf("Couldn't get key for PodDisruptionBudget object %+v: %v", pdb, err)
|
||||
logger.Error(err, "Couldn't get key for PodDisruptionBudget", "podDisruptionBudget", klog.KObj(pdb))
|
||||
return
|
||||
}
|
||||
dc.recheckQueue.AddAfter(key, delay)
|
||||
}
|
||||
|
||||
func (dc *DisruptionController) enqueueStalePodDisruptionCleanup(pod *v1.Pod, d time.Duration) {
|
||||
func (dc *DisruptionController) enqueueStalePodDisruptionCleanup(logger klog.Logger, pod *v1.Pod, d time.Duration) {
|
||||
key, err := controller.KeyFunc(pod)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Couldn't get key for Pod object", "pod", klog.KObj(pod))
|
||||
logger.Error(err, "Couldn't get key for Pod object", "pod", klog.KObj(pod))
|
||||
return
|
||||
}
|
||||
dc.stalePodDisruptionQueue.AddAfter(key, d)
|
||||
klog.V(4).InfoS("Enqueued pod to cleanup stale DisruptionTarget condition", "pod", klog.KObj(pod))
|
||||
logger.V(4).Info("Enqueued pod to cleanup stale DisruptionTarget condition", "pod", klog.KObj(pod))
|
||||
}
|
||||
|
||||
func (dc *DisruptionController) getPdbForPod(pod *v1.Pod) *policy.PodDisruptionBudget {
|
||||
func (dc *DisruptionController) getPdbForPod(logger klog.Logger, pod *v1.Pod) *policy.PodDisruptionBudget {
|
||||
// GetPodPodDisruptionBudgets returns an error only if no
|
||||
// PodDisruptionBudgets are found. We don't return that as an error to the
|
||||
// caller.
|
||||
pdbs, err := dc.pdbLister.GetPodPodDisruptionBudgets(pod)
|
||||
if err != nil {
|
||||
klog.V(4).Infof("No PodDisruptionBudgets found for pod %v, PodDisruptionBudget controller will avoid syncing.", pod.Name)
|
||||
logger.V(4).Info("No PodDisruptionBudgets found for pod, PodDisruptionBudget controller will avoid syncing.", "pod", klog.KObj(pod))
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(pdbs) > 1 {
|
||||
msg := fmt.Sprintf("Pod %q/%q matches multiple PodDisruptionBudgets. Chose %q arbitrarily.", pod.Namespace, pod.Name, pdbs[0].Name)
|
||||
klog.Warning(msg)
|
||||
logger.Info(msg)
|
||||
dc.recorder.Event(pod, v1.EventTypeWarning, "MultiplePodDisruptionBudgets", msg)
|
||||
}
|
||||
return pdbs[0]
|
||||
@ -656,9 +670,10 @@ func (dc *DisruptionController) processNextStalePodDisruptionWorkItem(ctx contex
|
||||
}
|
||||
|
||||
func (dc *DisruptionController) sync(ctx context.Context, key string) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
startTime := dc.clock.Now()
|
||||
defer func() {
|
||||
klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, dc.clock.Since(startTime))
|
||||
logger.V(4).Info("Finished syncing PodDisruptionBudget", "key", key, "duration", dc.clock.Since(startTime))
|
||||
}()
|
||||
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
@ -667,7 +682,7 @@ func (dc *DisruptionController) sync(ctx context.Context, key string) error {
|
||||
}
|
||||
pdb, err := dc.pdbLister.PodDisruptionBudgets(namespace).Get(name)
|
||||
if errors.IsNotFound(err) {
|
||||
klog.V(4).Infof("PodDisruptionBudget %q has been deleted", key)
|
||||
logger.V(4).Info("podDisruptionBudget has been deleted", "key", key)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
@ -681,7 +696,7 @@ func (dc *DisruptionController) sync(ctx context.Context, key string) error {
|
||||
return err
|
||||
}
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err)
|
||||
logger.Error(err, "Failed to sync PDB", "podDisruptionBudget", klog.KRef(namespace, name))
|
||||
return dc.failSafe(ctx, pdb, err)
|
||||
}
|
||||
|
||||
@ -689,6 +704,7 @@ func (dc *DisruptionController) sync(ctx context.Context, key string) error {
|
||||
}
|
||||
|
||||
func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisruptionBudget) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
pods, err := dc.getPodsForPdb(pdb)
|
||||
if err != nil {
|
||||
dc.recorder.Eventf(pdb, v1.EventTypeWarning, "NoPods", "Failed to get pods: %v", err)
|
||||
@ -705,7 +721,7 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr
|
||||
}
|
||||
// We have unmamanged pods, instead of erroring and hotlooping in disruption controller, log and continue.
|
||||
if len(unmanagedPods) > 0 {
|
||||
klog.Warningf("found unmanaged pods associated with this PDB: %v", unmanagedPods)
|
||||
logger.V(4).Info("Found unmanaged pods associated with this PDB", "pods", unmanagedPods)
|
||||
dc.recorder.Eventf(pdb, v1.EventTypeWarning, "UnmanagedPods", "Pods selected by this PodDisruptionBudget (selector: %v) were found "+
|
||||
"to be unmanaged. As a result, the status of the PDB cannot be calculated correctly, which may result in undefined behavior. "+
|
||||
"To account for these pods please set \".spec.minAvailable\" "+
|
||||
@ -713,7 +729,7 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr
|
||||
}
|
||||
|
||||
currentTime := dc.clock.Now()
|
||||
disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime)
|
||||
disruptedPods, recheckTime := dc.buildDisruptedPodMap(logger, pods, pdb, currentTime)
|
||||
currentHealthy := countHealthyPods(pods, disruptedPods, currentTime)
|
||||
err = dc.updatePdbStatus(ctx, pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods)
|
||||
|
||||
@ -721,23 +737,24 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr
|
||||
// There is always at most one PDB waiting with a particular name in the queue,
|
||||
// and each PDB in the queue is associated with the lowest timestamp
|
||||
// that was supplied when a PDB with that name was added.
|
||||
dc.enqueuePdbForRecheck(pdb, recheckTime.Sub(currentTime))
|
||||
dc.enqueuePdbForRecheck(logger, pdb, recheckTime.Sub(currentTime))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key string) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
startTime := dc.clock.Now()
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
klog.V(4).InfoS("Finished syncing Pod to clear DisruptionTarget condition", "pod", klog.KRef(namespace, name), "duration", dc.clock.Since(startTime))
|
||||
logger.V(4).Info("Finished syncing Pod to clear DisruptionTarget condition", "pod", klog.KRef(namespace, name), "duration", dc.clock.Since(startTime))
|
||||
}()
|
||||
pod, err := dc.podLister.Pods(namespace).Get(name)
|
||||
if errors.IsNotFound(err) {
|
||||
klog.V(4).InfoS("Skipping clearing DisruptionTarget condition because pod was deleted", "pod", klog.KObj(pod))
|
||||
logger.V(4).Info("Skipping clearing DisruptionTarget condition because pod was deleted", "pod", klog.KObj(pod))
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
@ -749,7 +766,7 @@ func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key
|
||||
return nil
|
||||
}
|
||||
if cleanAfter > 0 {
|
||||
dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter)
|
||||
dc.enqueueStalePodDisruptionCleanup(logger, pod, cleanAfter)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -765,7 +782,7 @@ func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key
|
||||
if _, err := dc.kubeClient.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil {
|
||||
return err
|
||||
}
|
||||
klog.V(2).InfoS("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod))
|
||||
logger.V(2).Info("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -895,7 +912,7 @@ func countHealthyPods(pods []*v1.Pod, disruptedPods map[string]metav1.Time, curr
|
||||
|
||||
// Builds new PodDisruption map, possibly removing items that refer to non-existing, already deleted
|
||||
// or not-deleted at all items. Also returns an information when this check should be repeated.
|
||||
func (dc *DisruptionController) buildDisruptedPodMap(pods []*v1.Pod, pdb *policy.PodDisruptionBudget, currentTime time.Time) (map[string]metav1.Time, *time.Time) {
|
||||
func (dc *DisruptionController) buildDisruptedPodMap(logger klog.Logger, pods []*v1.Pod, pdb *policy.PodDisruptionBudget, currentTime time.Time) (map[string]metav1.Time, *time.Time) {
|
||||
disruptedPods := pdb.Status.DisruptedPods
|
||||
result := make(map[string]metav1.Time)
|
||||
var recheckTime *time.Time
|
||||
@ -915,8 +932,8 @@ func (dc *DisruptionController) buildDisruptedPodMap(pods []*v1.Pod, pdb *policy
|
||||
}
|
||||
expectedDeletion := disruptionTime.Time.Add(DeletionTimeout)
|
||||
if expectedDeletion.Before(currentTime) {
|
||||
klog.V(1).Infof("Pod %s/%s was expected to be deleted at %s but it wasn't, updating pdb %s/%s",
|
||||
pod.Namespace, pod.Name, disruptionTime.String(), pdb.Namespace, pdb.Name)
|
||||
logger.V(1).Info("pod was expected to be deleted but it wasn't, updating PDB",
|
||||
"pod", klog.KObj(pod), "deletionTime", disruptionTime, "podDisruptionBudget", klog.KObj(pdb))
|
||||
dc.recorder.Eventf(pod, v1.EventTypeWarning, "NotDeleted", "Pod was expected by PDB %s/%s to be deleted but it wasn't",
|
||||
pdb.Namespace, pdb.Namespace)
|
||||
} else {
|
||||
|
@ -18,9 +18,7 @@ package disruption
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -51,9 +49,9 @@ import (
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
_ "k8s.io/kubernetes/pkg/apis/core/install"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
clocktesting "k8s.io/utils/clock/testing"
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
@ -149,8 +147,8 @@ var customGVK = schema.GroupVersionKind{
|
||||
Kind: "customresource",
|
||||
}
|
||||
|
||||
func newFakeDisruptionController() (*disruptionController, *pdbStates) {
|
||||
return newFakeDisruptionControllerWithTime(context.TODO(), time.Now())
|
||||
func newFakeDisruptionController(ctx context.Context) (*disruptionController, *pdbStates) {
|
||||
return newFakeDisruptionControllerWithTime(ctx, time.Now())
|
||||
}
|
||||
|
||||
func newFakeDisruptionControllerWithTime(ctx context.Context, now time.Time) (*disruptionController, *pdbStates) {
|
||||
@ -168,6 +166,7 @@ func newFakeDisruptionControllerWithTime(ctx context.Context, now time.Time) (*d
|
||||
fakeClock := clocktesting.NewFakeClock(now)
|
||||
|
||||
dc := NewDisruptionControllerInternal(
|
||||
ctx,
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Policy().V1().PodDisruptionBudgets(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
@ -424,14 +423,14 @@ func add(t *testing.T, store cache.Store, obj interface{}) {
|
||||
|
||||
// Create one with no selector. Verify it matches all pods
|
||||
func TestNoSelector(t *testing.T) {
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
|
||||
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(3))
|
||||
pdb.Spec.Selector = &metav1.LabelSelector{}
|
||||
pod, _ := newPod(t, "yo-yo-yo")
|
||||
|
||||
add(t, dc.pdbStore, pdb)
|
||||
ctx := context.TODO()
|
||||
dc.sync(ctx, pdbName)
|
||||
ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]metav1.Time{})
|
||||
|
||||
@ -443,10 +442,10 @@ func TestNoSelector(t *testing.T) {
|
||||
// Verify that available/expected counts go up as we add pods, then verify that
|
||||
// available count goes down when we make a pod unavailable.
|
||||
func TestUnavailable(t *testing.T) {
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
|
||||
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(3))
|
||||
ctx := context.TODO()
|
||||
add(t, dc.pdbStore, pdb)
|
||||
dc.sync(ctx, pdbName)
|
||||
|
||||
@ -473,11 +472,11 @@ func TestUnavailable(t *testing.T) {
|
||||
// Verify that an integer MaxUnavailable won't
|
||||
// allow a disruption for pods with no controller.
|
||||
func TestIntegerMaxUnavailable(t *testing.T) {
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
|
||||
pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromInt32(1))
|
||||
add(t, dc.pdbStore, pdb)
|
||||
ctx := context.TODO()
|
||||
dc.sync(ctx, pdbName)
|
||||
// This verifies that when a PDB has 0 pods, disruptions are not allowed.
|
||||
ps.VerifyDisruptionAllowed(t, pdbName, 0)
|
||||
@ -494,7 +493,8 @@ func TestIntegerMaxUnavailable(t *testing.T) {
|
||||
// Verify that an integer MaxUnavailable will recompute allowed disruptions when the scale of
|
||||
// the selected pod's controller is modified.
|
||||
func TestIntegerMaxUnavailableWithScaling(t *testing.T) {
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
|
||||
pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromInt32(2))
|
||||
add(t, dc.pdbStore, pdb)
|
||||
@ -504,7 +504,6 @@ func TestIntegerMaxUnavailableWithScaling(t *testing.T) {
|
||||
|
||||
pod, _ := newPod(t, "pod")
|
||||
updatePodOwnerToRs(t, pod, rs)
|
||||
ctx := context.TODO()
|
||||
add(t, dc.podStore, pod)
|
||||
dc.sync(ctx, pdbName)
|
||||
ps.VerifyPdbStatus(t, pdbName, 0, 1, 5, 7, map[string]metav1.Time{})
|
||||
@ -520,7 +519,8 @@ func TestIntegerMaxUnavailableWithScaling(t *testing.T) {
|
||||
// Verify that an percentage MaxUnavailable will recompute allowed disruptions when the scale of
|
||||
// the selected pod's controller is modified.
|
||||
func TestPercentageMaxUnavailableWithScaling(t *testing.T) {
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
|
||||
pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromString("30%"))
|
||||
add(t, dc.pdbStore, pdb)
|
||||
@ -531,7 +531,6 @@ func TestPercentageMaxUnavailableWithScaling(t *testing.T) {
|
||||
pod, _ := newPod(t, "pod")
|
||||
updatePodOwnerToRs(t, pod, rs)
|
||||
add(t, dc.podStore, pod)
|
||||
ctx := context.TODO()
|
||||
dc.sync(ctx, pdbName)
|
||||
ps.VerifyPdbStatus(t, pdbName, 0, 1, 4, 7, map[string]metav1.Time{})
|
||||
|
||||
@ -546,11 +545,11 @@ func TestPercentageMaxUnavailableWithScaling(t *testing.T) {
|
||||
// Create a pod with no controller, and verify that a PDB with a percentage
|
||||
// specified won't allow a disruption.
|
||||
func TestNakedPod(t *testing.T) {
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
|
||||
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%"))
|
||||
add(t, dc.pdbStore, pdb)
|
||||
ctx := context.TODO()
|
||||
dc.sync(ctx, pdbName)
|
||||
// This verifies that when a PDB has 0 pods, disruptions are not allowed.
|
||||
ps.VerifyDisruptionAllowed(t, pdbName, 0)
|
||||
@ -566,11 +565,11 @@ func TestNakedPod(t *testing.T) {
|
||||
// Create a pod with unsupported controller, and verify that a PDB with a percentage
|
||||
// specified won't allow a disruption.
|
||||
func TestUnsupportedControllerPod(t *testing.T) {
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
|
||||
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%"))
|
||||
add(t, dc.pdbStore, pdb)
|
||||
ctx := context.TODO()
|
||||
dc.sync(ctx, pdbName)
|
||||
// This verifies that when a PDB has 0 pods, disruptions are not allowed.
|
||||
ps.VerifyDisruptionAllowed(t, pdbName, 0)
|
||||
@ -594,11 +593,11 @@ func TestUnsupportedControllerPod(t *testing.T) {
|
||||
|
||||
// Verify that disruption controller is not erroring when unmanaged pods are found
|
||||
func TestStatusForUnmanagedPod(t *testing.T) {
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
|
||||
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%"))
|
||||
add(t, dc.pdbStore, pdb)
|
||||
ctx := context.TODO()
|
||||
dc.sync(ctx, pdbName)
|
||||
// This verifies that when a PDB has 0 pods, disruptions are not allowed.
|
||||
ps.VerifyDisruptionAllowed(t, pdbName, 0)
|
||||
@ -612,11 +611,11 @@ func TestStatusForUnmanagedPod(t *testing.T) {
|
||||
|
||||
// Check if the unmanaged pods are correctly collected or not
|
||||
func TestTotalUnmanagedPods(t *testing.T) {
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
|
||||
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%"))
|
||||
add(t, dc.pdbStore, pdb)
|
||||
ctx := context.TODO()
|
||||
dc.sync(ctx, pdbName)
|
||||
// This verifies that when a PDB has 0 pods, disruptions are not allowed.
|
||||
ps.VerifyDisruptionAllowed(t, pdbName, 0)
|
||||
@ -636,14 +635,14 @@ func TestTotalUnmanagedPods(t *testing.T) {
|
||||
|
||||
// Verify that we count the scale of a ReplicaSet even when it has no Deployment.
|
||||
func TestReplicaSet(t *testing.T) {
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
|
||||
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("20%"))
|
||||
add(t, dc.pdbStore, pdb)
|
||||
|
||||
rs, _ := newReplicaSet(t, 10)
|
||||
add(t, dc.rsStore, rs)
|
||||
ctx := context.TODO()
|
||||
pod, _ := newPod(t, "pod")
|
||||
updatePodOwnerToRs(t, pod, rs)
|
||||
add(t, dc.podStore, pod)
|
||||
@ -657,7 +656,8 @@ func TestScaleResource(t *testing.T) {
|
||||
pods := int32(4)
|
||||
maxUnavailable := int32(5)
|
||||
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
|
||||
dc.scaleClient.AddReactor("get", "customresources", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||
obj := &autoscalingapi.Scale{
|
||||
@ -688,7 +688,6 @@ func TestScaleResource(t *testing.T) {
|
||||
})
|
||||
add(t, dc.podStore, pod)
|
||||
}
|
||||
ctx := context.TODO()
|
||||
dc.sync(ctx, pdbName)
|
||||
disruptionsAllowed := int32(0)
|
||||
if replicas-pods < maxUnavailable {
|
||||
@ -750,7 +749,8 @@ func TestScaleFinderNoResource(t *testing.T) {
|
||||
t.Run(tn, func(t *testing.T) {
|
||||
customResourceUID := uuid.NewUUID()
|
||||
|
||||
dc, _ := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, _ := newFakeDisruptionController(ctx)
|
||||
|
||||
dc.scaleClient.AddReactor("get", resourceName, func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||
gr := schema.GroupResource{
|
||||
@ -774,7 +774,7 @@ func TestScaleFinderNoResource(t *testing.T) {
|
||||
UID: customResourceUID,
|
||||
}
|
||||
|
||||
_, err := dc.getScaleController(context.TODO(), ownerRef, "default")
|
||||
_, err := dc.getScaleController(ctx, ownerRef, "default")
|
||||
|
||||
if tc.expectError && err == nil {
|
||||
t.Error("expected error, but didn't get one")
|
||||
@ -790,8 +790,8 @@ func TestScaleFinderNoResource(t *testing.T) {
|
||||
// Verify that multiple controllers doesn't allow the PDB to be set true.
|
||||
func TestMultipleControllers(t *testing.T) {
|
||||
const podCount = 2
|
||||
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
|
||||
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("1%"))
|
||||
add(t, dc.pdbStore, pdb)
|
||||
@ -802,7 +802,6 @@ func TestMultipleControllers(t *testing.T) {
|
||||
pods = append(pods, pod)
|
||||
add(t, dc.podStore, pod)
|
||||
}
|
||||
ctx := context.TODO()
|
||||
dc.sync(ctx, pdbName)
|
||||
|
||||
// No controllers yet => no disruption allowed
|
||||
@ -840,8 +839,8 @@ func TestReplicationController(t *testing.T) {
|
||||
"foo": "bar",
|
||||
"baz": "quux",
|
||||
}
|
||||
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
|
||||
// 34% should round up to 2
|
||||
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("34%"))
|
||||
@ -849,7 +848,6 @@ func TestReplicationController(t *testing.T) {
|
||||
rc, _ := newReplicationController(t, 3)
|
||||
rc.Spec.Selector = labels
|
||||
add(t, dc.rcStore, rc)
|
||||
ctx := context.TODO()
|
||||
dc.sync(ctx, pdbName)
|
||||
|
||||
// It starts out at 0 expected because, with no pods, the PDB doesn't know
|
||||
@ -881,14 +879,14 @@ func TestStatefulSetController(t *testing.T) {
|
||||
"baz": "quux",
|
||||
}
|
||||
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
|
||||
// 34% should round up to 2
|
||||
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("34%"))
|
||||
add(t, dc.pdbStore, pdb)
|
||||
ss, _ := newStatefulSet(t, 3)
|
||||
add(t, dc.ssStore, ss)
|
||||
ctx := context.TODO()
|
||||
dc.sync(ctx, pdbName)
|
||||
|
||||
// It starts out at 0 expected because, with no pods, the PDB doesn't know
|
||||
@ -920,7 +918,8 @@ func TestTwoControllers(t *testing.T) {
|
||||
"foo": "bar",
|
||||
"baz": "quuux",
|
||||
}
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
|
||||
// These constants are related, but I avoid calculating the correct values in
|
||||
// code. If you update a parameter here, recalculate the correct values for
|
||||
@ -935,7 +934,6 @@ func TestTwoControllers(t *testing.T) {
|
||||
rc, _ := newReplicationController(t, collectionSize)
|
||||
rc.Spec.Selector = rcLabels
|
||||
add(t, dc.rcStore, rc)
|
||||
ctx := context.TODO()
|
||||
dc.sync(ctx, pdbName)
|
||||
|
||||
ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{})
|
||||
@ -1019,16 +1017,18 @@ func TestTwoControllers(t *testing.T) {
|
||||
|
||||
// Test pdb doesn't exist
|
||||
func TestPDBNotExist(t *testing.T) {
|
||||
dc, _ := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, _ := newFakeDisruptionController(ctx)
|
||||
pdb, _ := newMinAvailablePodDisruptionBudget(t, intstr.FromString("67%"))
|
||||
add(t, dc.pdbStore, pdb)
|
||||
if err := dc.sync(context.TODO(), "notExist"); err != nil {
|
||||
if err := dc.sync(ctx, "notExist"); err != nil {
|
||||
t.Errorf("Unexpected error: %v, expect nil", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateDisruptedPods(t *testing.T) {
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
dc.recheckQueue = workqueue.NewNamedDelayingQueue("pdb_queue")
|
||||
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(1))
|
||||
currentTime := dc.clock.Now()
|
||||
@ -1049,13 +1049,14 @@ func TestUpdateDisruptedPods(t *testing.T) {
|
||||
add(t, dc.podStore, pod2)
|
||||
add(t, dc.podStore, pod3)
|
||||
|
||||
dc.sync(context.TODO(), pdbName)
|
||||
dc.sync(ctx, pdbName)
|
||||
|
||||
ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime.Add(-time.Minute)}})
|
||||
}
|
||||
|
||||
func TestBasicFinderFunctions(t *testing.T) {
|
||||
dc, _ := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, _ := newFakeDisruptionController(ctx)
|
||||
|
||||
rs, _ := newReplicaSet(t, 10)
|
||||
add(t, dc.rsStore, rs)
|
||||
@ -1135,7 +1136,7 @@ func TestBasicFinderFunctions(t *testing.T) {
|
||||
UID: tc.uid,
|
||||
}
|
||||
|
||||
controllerAndScale, _ := tc.finderFunc(context.TODO(), controllerRef, metav1.NamespaceDefault)
|
||||
controllerAndScale, _ := tc.finderFunc(ctx, controllerRef, metav1.NamespaceDefault)
|
||||
|
||||
if controllerAndScale == nil {
|
||||
if tc.findsScale {
|
||||
@ -1208,7 +1209,8 @@ func TestDeploymentFinderFunction(t *testing.T) {
|
||||
|
||||
for tn, tc := range testCases {
|
||||
t.Run(tn, func(t *testing.T) {
|
||||
dc, _ := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, _ := newFakeDisruptionController(ctx)
|
||||
|
||||
dep, _ := newDeployment(t, 10)
|
||||
dep.Spec.Selector = newSel(labels)
|
||||
@ -1233,7 +1235,7 @@ func TestDeploymentFinderFunction(t *testing.T) {
|
||||
UID: rs.UID,
|
||||
}
|
||||
|
||||
controllerAndScale, _ := dc.getPodDeployment(context.TODO(), controllerRef, metav1.NamespaceDefault)
|
||||
controllerAndScale, _ := dc.getPodDeployment(ctx, controllerRef, metav1.NamespaceDefault)
|
||||
|
||||
if controllerAndScale == nil {
|
||||
if tc.findsScale {
|
||||
@ -1267,10 +1269,10 @@ func TestDeploymentFinderFunction(t *testing.T) {
|
||||
// (C) If the DisruptionController writes DisruptionsAllowed=1 despite the
|
||||
// resource conflict error, then there is a bug.
|
||||
func TestUpdatePDBStatusRetries(t *testing.T) {
|
||||
dc, _ := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
dc, _ := newFakeDisruptionController(ctx)
|
||||
// Inject the production code over our fake impl
|
||||
dc.getUpdater = func() updater { return dc.writePdbStatus }
|
||||
ctx := context.TODO()
|
||||
// Create a PDB and 3 pods that match it.
|
||||
pdb, pdbKey := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(1))
|
||||
pdb, err := dc.coreClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).Create(ctx, pdb, metav1.CreateOptions{})
|
||||
@ -1380,8 +1382,6 @@ func TestUpdatePDBStatusRetries(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestInvalidSelectors(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
testCases := map[string]struct {
|
||||
labelSelector *metav1.LabelSelector
|
||||
}{
|
||||
@ -1407,7 +1407,9 @@ func TestInvalidSelectors(t *testing.T) {
|
||||
|
||||
for tn, tc := range testCases {
|
||||
t.Run(tn, func(t *testing.T) {
|
||||
dc, ps := newFakeDisruptionController()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
|
||||
dc, ps := newFakeDisruptionController(ctx)
|
||||
|
||||
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(3))
|
||||
pdb.Spec.Selector = tc.labelSelector
|
||||
@ -1536,7 +1538,8 @@ func TestStalePodDisruption(t *testing.T) {
|
||||
}
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
dc, _ := newFakeDisruptionControllerWithTime(ctx, now)
|
||||
go dc.Run(ctx)
|
||||
@ -1584,9 +1587,3 @@ func verifyEventEmitted(t *testing.T, dc *disruptionController, expectedEvent st
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestMain adds klog flags to make debugging tests easier.
|
||||
func TestMain(m *testing.M) {
|
||||
klog.InitFlags(flag.CommandLine)
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
@ -50,13 +50,14 @@ import (
|
||||
"k8s.io/kubernetes/test/integration/etcd"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
"k8s.io/kubernetes/test/integration/util"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
"k8s.io/utils/clock"
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
const stalePodDisruptionTimeout = 3 * time.Second
|
||||
|
||||
func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface, *apiextensionsclientset.Clientset, dynamic.Interface) {
|
||||
func setup(ctx context.Context, t *testing.T) (*kubeapiservertesting.TestServer, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface, *apiextensionsclientset.Clientset, dynamic.Interface) {
|
||||
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins", "ServiceAccount"}, framework.SharedEtcd())
|
||||
|
||||
clientSet, err := clientset.NewForConfig(server.ClientConfig)
|
||||
@ -88,6 +89,7 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti
|
||||
}
|
||||
|
||||
pdbc := disruption.NewDisruptionControllerInternal(
|
||||
ctx,
|
||||
informers.Core().V1().Pods(),
|
||||
informers.Policy().V1().PodDisruptionBudgets(),
|
||||
informers.Core().V1().ReplicationControllers(),
|
||||
@ -105,11 +107,13 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti
|
||||
}
|
||||
|
||||
func TestPDBWithScaleSubresource(t *testing.T) {
|
||||
s, pdbc, informers, clientSet, apiExtensionClient, dynamicClient := setup(t)
|
||||
defer s.TearDownFn()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s, pdbc, informers, clientSet, apiExtensionClient, dynamicClient := setup(ctx, t)
|
||||
defer s.TearDownFn()
|
||||
defer cancel()
|
||||
|
||||
nsName := "pdb-scale-subresource"
|
||||
createNs(ctx, t, nsName, clientSet)
|
||||
|
||||
@ -238,10 +242,11 @@ func TestEmptySelector(t *testing.T) {
|
||||
|
||||
for i, tc := range testcases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
s, pdbc, informers, clientSet, _, _ := setup(t)
|
||||
defer s.TearDownFn()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s, pdbc, informers, clientSet, _, _ := setup(ctx, t)
|
||||
defer s.TearDownFn()
|
||||
defer cancel()
|
||||
|
||||
nsName := fmt.Sprintf("pdb-empty-selector-%d", i)
|
||||
@ -354,10 +359,11 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) {
|
||||
|
||||
for i, tc := range testcases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
s, pdbc, informers, clientSet, _, _ := setup(t)
|
||||
defer s.TearDownFn()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s, pdbc, informers, clientSet, _, _ := setup(ctx, t)
|
||||
defer s.TearDownFn()
|
||||
defer cancel()
|
||||
|
||||
nsName := fmt.Sprintf("pdb-selectors-%d", i)
|
||||
@ -508,12 +514,12 @@ func waitToObservePods(t *testing.T, podInformer cache.SharedIndexInformer, podN
|
||||
}
|
||||
|
||||
func TestPatchCompatibility(t *testing.T) {
|
||||
s, pdbc, _, clientSet, _, _ := setup(t)
|
||||
defer s.TearDownFn()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
s, pdbc, _, clientSet, _, _ := setup(ctx, t)
|
||||
defer s.TearDownFn()
|
||||
// Even though pdbc isn't used in this test, its creation is already
|
||||
// spawning some goroutines. So we need to run it to ensure they won't leak.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
pdbc.Run(ctx)
|
||||
|
||||
@ -653,10 +659,11 @@ func TestPatchCompatibility(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStalePodDisruption(t *testing.T) {
|
||||
s, pdbc, informers, clientSet, _, _ := setup(t)
|
||||
defer s.TearDownFn()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s, pdbc, informers, clientSet, _, _ := setup(ctx, t)
|
||||
defer s.TearDownFn()
|
||||
defer cancel()
|
||||
|
||||
nsName := "pdb-stale-pod-disruption"
|
||||
|
@ -57,6 +57,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/controller/disruption"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -68,14 +69,16 @@ const (
|
||||
func TestConcurrentEvictionRequests(t *testing.T) {
|
||||
podNameFormat := "test-pod-%d"
|
||||
|
||||
closeFn, rm, informers, _, clientSet := rmSetup(t)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
closeFn, rm, informers, _, clientSet := rmSetup(ctx, t)
|
||||
defer closeFn()
|
||||
|
||||
ns := framework.CreateNamespaceOrDie(clientSet, "concurrent-eviction-requests", t)
|
||||
defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
informers.Start(ctx.Done())
|
||||
go rm.Run(ctx)
|
||||
|
||||
@ -181,14 +184,16 @@ func TestConcurrentEvictionRequests(t *testing.T) {
|
||||
|
||||
// TestTerminalPodEviction ensures that PDB is not checked for terminal pods.
|
||||
func TestTerminalPodEviction(t *testing.T) {
|
||||
closeFn, rm, informers, _, clientSet := rmSetup(t)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
closeFn, rm, informers, _, clientSet := rmSetup(ctx, t)
|
||||
defer closeFn()
|
||||
|
||||
ns := framework.CreateNamespaceOrDie(clientSet, "terminalpod-eviction", t)
|
||||
defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
informers.Start(ctx.Done())
|
||||
go rm.Run(ctx)
|
||||
|
||||
@ -255,11 +260,13 @@ func TestTerminalPodEviction(t *testing.T) {
|
||||
|
||||
// TestEvictionVersions ensures the eviction endpoint accepts and returns the correct API versions
|
||||
func TestEvictionVersions(t *testing.T) {
|
||||
closeFn, rm, informers, config, clientSet := rmSetup(t)
|
||||
defer closeFn()
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
closeFn, rm, informers, config, clientSet := rmSetup(ctx, t)
|
||||
defer closeFn()
|
||||
defer cancel()
|
||||
|
||||
informers.Start(ctx.Done())
|
||||
go rm.Run(ctx)
|
||||
|
||||
@ -379,15 +386,17 @@ func TestEvictionWithFinalizers(t *testing.T) {
|
||||
}
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
closeFn, rm, informers, _, clientSet := rmSetup(t)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
closeFn, rm, informers, _, clientSet := rmSetup(ctx, t)
|
||||
defer closeFn()
|
||||
|
||||
ns := framework.CreateNamespaceOrDie(clientSet, "eviction-with-finalizers", t)
|
||||
defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
informers.Start(ctx.Done())
|
||||
go rm.Run(ctx)
|
||||
|
||||
@ -465,15 +474,17 @@ func TestEvictionWithUnhealthyPodEvictionPolicy(t *testing.T) {
|
||||
}
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PDBUnhealthyPodEvictionPolicy, tc.enableUnhealthyPodEvictionPolicy)()
|
||||
closeFn, rm, informers, _, clientSet := rmSetup(t)
|
||||
closeFn, rm, informers, _, clientSet := rmSetup(ctx, t)
|
||||
defer closeFn()
|
||||
|
||||
ns := framework.CreateNamespaceOrDie(clientSet, "eviction-with-pdb-pod-healthy-policy", t)
|
||||
defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
informers.Start(ctx.Done())
|
||||
go rm.Run(ctx)
|
||||
|
||||
@ -559,13 +570,15 @@ func TestEvictionWithPrecondition(t *testing.T) {
|
||||
}
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
closeFn, rm, informers, _, clientSet := rmSetup(t)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
closeFn, rm, informers, _, clientSet := rmSetup(ctx, t)
|
||||
defer closeFn()
|
||||
|
||||
ns := framework.CreateNamespaceOrDie(clientSet, "eviction-with-preconditions", t)
|
||||
defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
informers.Start(ctx.Done())
|
||||
go rm.Run(ctx)
|
||||
@ -685,7 +698,7 @@ func newV1Eviction(ns, evictionName string, deleteOption metav1.DeleteOptions) *
|
||||
}
|
||||
}
|
||||
|
||||
func rmSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *disruption.DisruptionController, informers.SharedInformerFactory, *restclient.Config, clientset.Interface) {
|
||||
func rmSetup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc, *disruption.DisruptionController, informers.SharedInformerFactory, *restclient.Config, clientset.Interface) {
|
||||
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
|
||||
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
|
||||
|
||||
@ -709,6 +722,7 @@ func rmSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *disruption.Disru
|
||||
}
|
||||
|
||||
rm := disruption.NewDisruptionController(
|
||||
ctx,
|
||||
informers.Core().V1().Pods(),
|
||||
informers.Policy().V1().PodDisruptionBudgets(),
|
||||
informers.Core().V1().ReplicationControllers(),
|
||||
|
@ -605,6 +605,7 @@ func PodScheduled(c clientset.Interface, podNamespace, podName string) wait.Cond
|
||||
// InitDisruptionController initializes and runs a Disruption Controller to properly
|
||||
// update PodDisuptionBudget objects.
|
||||
func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.DisruptionController {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
informers := informers.NewSharedInformerFactory(testCtx.ClientSet, 12*time.Hour)
|
||||
|
||||
discoveryClient := cacheddiscovery.NewMemCacheClient(testCtx.ClientSet.Discovery())
|
||||
@ -618,6 +619,7 @@ func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.Di
|
||||
}
|
||||
|
||||
dc := disruption.NewDisruptionController(
|
||||
ctx,
|
||||
informers.Core().V1().Pods(),
|
||||
informers.Policy().V1().PodDisruptionBudgets(),
|
||||
informers.Core().V1().ReplicationControllers(),
|
||||
|
Loading…
Reference in New Issue
Block a user