Merge pull request #107935 from ravisantoshgudimetla/wire-contexts-disruption

Wire contexts to Disruption controllers
This commit is contained in:
Kubernetes Prow Robot 2022-02-04 10:08:13 -08:00 committed by GitHub
commit c1190f5aa2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 152 additions and 141 deletions

View File

@ -58,6 +58,6 @@ func startDisruptionController(ctx context.Context, controllerContext Controller
controllerContext.RESTMapper, controllerContext.RESTMapper,
scaleClient, scaleClient,
client.Discovery(), client.Discovery(),
).Run(ctx.Done()) ).Run(ctx)
return nil, true, nil return nil, true, nil
} }

View File

@ -68,7 +68,7 @@ const (
DeletionTimeout = 2 * 60 * time.Second DeletionTimeout = 2 * 60 * time.Second
) )
type updater func(*policy.PodDisruptionBudget) error type updater func(context.Context, *policy.PodDisruptionBudget) error
type DisruptionController struct { type DisruptionController struct {
kubeClient clientset.Interface kubeClient clientset.Interface
@ -114,7 +114,7 @@ type controllerAndScale struct {
// podControllerFinder is a function type that maps a pod to a list of // podControllerFinder is a function type that maps a pod to a list of
// controllers and their scale. // controllers and their scale.
type podControllerFinder func(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) type podControllerFinder func(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error)
func NewDisruptionController( func NewDisruptionController(
podInformer coreinformers.PodInformer, podInformer coreinformers.PodInformer,
@ -192,7 +192,7 @@ var (
) )
// getPodReplicaSet finds a replicaset which has no matching deployments. // getPodReplicaSet finds a replicaset which has no matching deployments.
func (dc *DisruptionController) getPodReplicaSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { func (dc *DisruptionController) getPodReplicaSet(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"}) ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
if !ok || err != nil { if !ok || err != nil {
return nil, err return nil, err
@ -214,7 +214,7 @@ func (dc *DisruptionController) getPodReplicaSet(controllerRef *metav1.OwnerRefe
} }
// getPodStatefulSet returns the statefulset referenced by the provided controllerRef. // getPodStatefulSet returns the statefulset referenced by the provided controllerRef.
func (dc *DisruptionController) getPodStatefulSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { func (dc *DisruptionController) getPodStatefulSet(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
ok, err := verifyGroupKind(controllerRef, controllerKindSS.Kind, []string{"apps"}) ok, err := verifyGroupKind(controllerRef, controllerKindSS.Kind, []string{"apps"})
if !ok || err != nil { if !ok || err != nil {
return nil, err return nil, err
@ -232,7 +232,7 @@ func (dc *DisruptionController) getPodStatefulSet(controllerRef *metav1.OwnerRef
} }
// getPodDeployments finds deployments for any replicasets which are being managed by deployments. // getPodDeployments finds deployments for any replicasets which are being managed by deployments.
func (dc *DisruptionController) getPodDeployment(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { func (dc *DisruptionController) getPodDeployment(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"}) ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
if !ok || err != nil { if !ok || err != nil {
return nil, err return nil, err
@ -265,7 +265,7 @@ func (dc *DisruptionController) getPodDeployment(controllerRef *metav1.OwnerRefe
return &controllerAndScale{deployment.UID, *(deployment.Spec.Replicas)}, nil return &controllerAndScale{deployment.UID, *(deployment.Spec.Replicas)}, nil
} }
func (dc *DisruptionController) getPodReplicationController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { func (dc *DisruptionController) getPodReplicationController(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
ok, err := verifyGroupKind(controllerRef, controllerKindRC.Kind, []string{""}) ok, err := verifyGroupKind(controllerRef, controllerKindRC.Kind, []string{""})
if !ok || err != nil { if !ok || err != nil {
return nil, err return nil, err
@ -281,7 +281,7 @@ func (dc *DisruptionController) getPodReplicationController(controllerRef *metav
return &controllerAndScale{rc.UID, *(rc.Spec.Replicas)}, nil return &controllerAndScale{rc.UID, *(rc.Spec.Replicas)}, nil
} }
func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { func (dc *DisruptionController) getScaleController(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
gv, err := schema.ParseGroupVersion(controllerRef.APIVersion) gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
if err != nil { if err != nil {
return nil, err return nil, err
@ -298,7 +298,7 @@ func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerRe
} }
gr := mapping.Resource.GroupResource() gr := mapping.Resource.GroupResource()
scale, err := dc.scaleNamespacer.Scales(namespace).Get(context.TODO(), gr, controllerRef.Name, metav1.GetOptions{}) scale, err := dc.scaleNamespacer.Scales(namespace).Get(ctx, gr, controllerRef.Name, metav1.GetOptions{})
if err != nil { if err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
// The IsNotFound error can mean either that the resource does not exist, // The IsNotFound error can mean either that the resource does not exist,
@ -356,14 +356,14 @@ func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string,
return false, nil return false, nil
} }
func (dc *DisruptionController) Run(stopCh <-chan struct{}) { func (dc *DisruptionController) Run(ctx context.Context) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer dc.queue.ShutDown() defer dc.queue.ShutDown()
klog.Infof("Starting disruption controller") klog.Infof("Starting disruption controller")
defer klog.Infof("Shutting down disruption controller") defer klog.Infof("Shutting down disruption controller")
if !cache.WaitForNamedCacheSync("disruption", stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) { if !cache.WaitForNamedCacheSync("disruption", ctx.Done(), dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
return return
} }
@ -373,10 +373,10 @@ func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
} else { } else {
klog.Infof("No api server defined - no events will be sent to API server.") klog.Infof("No api server defined - no events will be sent to API server.")
} }
go wait.Until(dc.worker, time.Second, stopCh) go wait.UntilWithContext(ctx, dc.worker, time.Second)
go wait.Until(dc.recheckWorker, time.Second, stopCh) go wait.Until(dc.recheckWorker, time.Second, ctx.Done())
<-stopCh <-ctx.Done()
} }
func (dc *DisruptionController) addDb(obj interface{}) { func (dc *DisruptionController) addDb(obj interface{}) {
@ -513,19 +513,19 @@ func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) (
return pods, nil return pods, nil
} }
func (dc *DisruptionController) worker() { func (dc *DisruptionController) worker(ctx context.Context) {
for dc.processNextWorkItem() { for dc.processNextWorkItem(ctx) {
} }
} }
func (dc *DisruptionController) processNextWorkItem() bool { func (dc *DisruptionController) processNextWorkItem(ctx context.Context) bool {
dKey, quit := dc.queue.Get() dKey, quit := dc.queue.Get()
if quit { if quit {
return false return false
} }
defer dc.queue.Done(dKey) defer dc.queue.Done(dKey)
err := dc.sync(dKey.(string)) err := dc.sync(ctx, dKey.(string))
if err == nil { if err == nil {
dc.queue.Forget(dKey) dc.queue.Forget(dKey)
return true return true
@ -552,7 +552,7 @@ func (dc *DisruptionController) processNextRecheckWorkItem() bool {
return true return true
} }
func (dc *DisruptionController) sync(key string) error { func (dc *DisruptionController) sync(ctx context.Context, key string) error {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Since(startTime)) klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Since(startTime))
@ -571,7 +571,7 @@ func (dc *DisruptionController) sync(key string) error {
return err return err
} }
err = dc.trySync(pdb) err = dc.trySync(ctx, pdb)
// If the reason for failure was a conflict, then allow this PDB update to be // If the reason for failure was a conflict, then allow this PDB update to be
// requeued without triggering the failSafe logic. // requeued without triggering the failSafe logic.
if errors.IsConflict(err) { if errors.IsConflict(err) {
@ -579,13 +579,13 @@ func (dc *DisruptionController) sync(key string) error {
} }
if err != nil { if err != nil {
klog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err) klog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err)
return dc.failSafe(pdb, err) return dc.failSafe(ctx, pdb, err)
} }
return nil return nil
} }
func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error { func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisruptionBudget) error {
pods, err := dc.getPodsForPdb(pdb) pods, err := dc.getPodsForPdb(pdb)
if err != nil { if err != nil {
dc.recorder.Eventf(pdb, v1.EventTypeWarning, "NoPods", "Failed to get pods: %v", err) dc.recorder.Eventf(pdb, v1.EventTypeWarning, "NoPods", "Failed to get pods: %v", err)
@ -595,7 +595,7 @@ func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error {
dc.recorder.Eventf(pdb, v1.EventTypeNormal, "NoPods", "No matching pods found") dc.recorder.Eventf(pdb, v1.EventTypeNormal, "NoPods", "No matching pods found")
} }
expectedCount, desiredHealthy, unmanagedPods, err := dc.getExpectedPodCount(pdb, pods) expectedCount, desiredHealthy, unmanagedPods, err := dc.getExpectedPodCount(ctx, pdb, pods)
if err != nil { if err != nil {
dc.recorder.Eventf(pdb, v1.EventTypeWarning, "CalculateExpectedPodCountFailed", "Failed to calculate the number of expected pods: %v", err) dc.recorder.Eventf(pdb, v1.EventTypeWarning, "CalculateExpectedPodCountFailed", "Failed to calculate the number of expected pods: %v", err)
return err return err
@ -609,7 +609,7 @@ func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error {
currentTime := time.Now() currentTime := time.Now()
disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime) disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime)
currentHealthy := countHealthyPods(pods, disruptedPods, currentTime) currentHealthy := countHealthyPods(pods, disruptedPods, currentTime)
err = dc.updatePdbStatus(pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods) err = dc.updatePdbStatus(ctx, pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods)
if err == nil && recheckTime != nil { if err == nil && recheckTime != nil {
// There is always at most one PDB waiting with a particular name in the queue, // There is always at most one PDB waiting with a particular name in the queue,
@ -620,7 +620,7 @@ func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error {
return err return err
} }
func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy int32, unmanagedPods []string, err error) { func (dc *DisruptionController) getExpectedPodCount(ctx context.Context, pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy int32, unmanagedPods []string, err error) {
err = nil err = nil
// TODO(davidopp): consider making the way expectedCount and rules about // TODO(davidopp): consider making the way expectedCount and rules about
// permitted controller configurations (specifically, considering it an error // permitted controller configurations (specifically, considering it an error
@ -628,7 +628,7 @@ func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBud
// handled the same way for integer and percentage minAvailable // handled the same way for integer and percentage minAvailable
if pdb.Spec.MaxUnavailable != nil { if pdb.Spec.MaxUnavailable != nil {
expectedCount, unmanagedPods, err = dc.getExpectedScale(pdb, pods) expectedCount, unmanagedPods, err = dc.getExpectedScale(ctx, pdb, pods)
if err != nil { if err != nil {
return return
} }
@ -646,7 +646,7 @@ func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBud
desiredHealthy = pdb.Spec.MinAvailable.IntVal desiredHealthy = pdb.Spec.MinAvailable.IntVal
expectedCount = int32(len(pods)) expectedCount = int32(len(pods))
} else if pdb.Spec.MinAvailable.Type == intstr.String { } else if pdb.Spec.MinAvailable.Type == intstr.String {
expectedCount, unmanagedPods, err = dc.getExpectedScale(pdb, pods) expectedCount, unmanagedPods, err = dc.getExpectedScale(ctx, pdb, pods)
if err != nil { if err != nil {
return return
} }
@ -662,7 +662,7 @@ func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBud
return return
} }
func (dc *DisruptionController) getExpectedScale(pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount int32, unmanagedPods []string, err error) { func (dc *DisruptionController) getExpectedScale(ctx context.Context, pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount int32, unmanagedPods []string, err error) {
// When the user specifies a fraction of pods that must be available, we // When the user specifies a fraction of pods that must be available, we
// use as the fraction's denominator // use as the fraction's denominator
// SUM_{all c in C} scale(c) // SUM_{all c in C} scale(c)
@ -701,7 +701,7 @@ func (dc *DisruptionController) getExpectedScale(pdb *policy.PodDisruptionBudget
foundController := false foundController := false
for _, finder := range dc.finders() { for _, finder := range dc.finders() {
var controllerNScale *controllerAndScale var controllerNScale *controllerAndScale
controllerNScale, err = finder(controllerRef, pod.Namespace) controllerNScale, err = finder(ctx, controllerRef, pod.Namespace)
if err != nil { if err != nil {
return return
} }
@ -785,7 +785,7 @@ func (dc *DisruptionController) buildDisruptedPodMap(pods []*v1.Pod, pdb *policy
// implement the "fail open" part of the design since if we manage to update // implement the "fail open" part of the design since if we manage to update
// this field correctly, we will prevent the /evict handler from approving an // this field correctly, we will prevent the /evict handler from approving an
// eviction when it may be unsafe to do so. // eviction when it may be unsafe to do so.
func (dc *DisruptionController) failSafe(pdb *policy.PodDisruptionBudget, err error) error { func (dc *DisruptionController) failSafe(ctx context.Context, pdb *policy.PodDisruptionBudget, err error) error {
newPdb := pdb.DeepCopy() newPdb := pdb.DeepCopy()
newPdb.Status.DisruptionsAllowed = 0 newPdb.Status.DisruptionsAllowed = 0
@ -800,10 +800,10 @@ func (dc *DisruptionController) failSafe(pdb *policy.PodDisruptionBudget, err er
ObservedGeneration: newPdb.Status.ObservedGeneration, ObservedGeneration: newPdb.Status.ObservedGeneration,
}) })
return dc.getUpdater()(newPdb) return dc.getUpdater()(ctx, newPdb)
} }
func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32, func (dc *DisruptionController) updatePdbStatus(ctx context.Context, pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32,
disruptedPods map[string]metav1.Time) error { disruptedPods map[string]metav1.Time) error {
// We require expectedCount to be > 0 so that PDBs which currently match no // We require expectedCount to be > 0 so that PDBs which currently match no
@ -837,12 +837,12 @@ func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget,
pdbhelper.UpdateDisruptionAllowedCondition(newPdb) pdbhelper.UpdateDisruptionAllowedCondition(newPdb)
return dc.getUpdater()(newPdb) return dc.getUpdater()(ctx, newPdb)
} }
func (dc *DisruptionController) writePdbStatus(pdb *policy.PodDisruptionBudget) error { func (dc *DisruptionController) writePdbStatus(ctx context.Context, pdb *policy.PodDisruptionBudget) error {
// If this update fails, don't retry it. Allow the failure to get handled & // If this update fails, don't retry it. Allow the failure to get handled &
// retried in `processNextWorkItem()`. // retried in `processNextWorkItem()`.
_, err := dc.kubeClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(context.TODO(), pdb, metav1.UpdateOptions{}) _, err := dc.kubeClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(ctx, pdb, metav1.UpdateOptions{})
return err return err
} }

View File

@ -59,7 +59,7 @@ type pdbStates map[string]policy.PodDisruptionBudget
var alwaysReady = func() bool { return true } var alwaysReady = func() bool { return true }
func (ps *pdbStates) Set(pdb *policy.PodDisruptionBudget) error { func (ps *pdbStates) Set(ctx context.Context, pdb *policy.PodDisruptionBudget) error {
key, err := controller.KeyFunc(pdb) key, err := controller.KeyFunc(pdb)
if err != nil { if err != nil {
return err return err
@ -178,8 +178,8 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
dc.rsListerSynced = alwaysReady dc.rsListerSynced = alwaysReady
dc.dListerSynced = alwaysReady dc.dListerSynced = alwaysReady
dc.ssListerSynced = alwaysReady dc.ssListerSynced = alwaysReady
ctx := context.TODO()
informerFactory.Start(context.TODO().Done()) informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(nil) informerFactory.WaitForCacheSync(nil)
return &disruptionController{ return &disruptionController{
@ -421,11 +421,12 @@ func TestNoSelector(t *testing.T) {
pod, _ := newPod(t, "yo-yo-yo") pod, _ := newPod(t, "yo-yo-yo")
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
dc.sync(pdbName) ctx := context.TODO()
dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]metav1.Time{})
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 1, 3, 1, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 1, 3, 1, map[string]metav1.Time{})
} }
@ -435,8 +436,9 @@ func TestUnavailable(t *testing.T) {
dc, ps := newFakeDisruptionController() dc, ps := newFakeDisruptionController()
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt(3)) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt(3))
ctx := context.TODO()
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
dc.sync(pdbName) dc.sync(ctx, pdbName)
// Add three pods, verifying that the counts go up at each step. // Add three pods, verifying that the counts go up at each step.
pods := []*v1.Pod{} pods := []*v1.Pod{}
@ -445,14 +447,14 @@ func TestUnavailable(t *testing.T) {
pod, _ := newPod(t, fmt.Sprintf("yo-yo-yo %d", i)) pod, _ := newPod(t, fmt.Sprintf("yo-yo-yo %d", i))
pods = append(pods, pod) pods = append(pods, pod)
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(ctx, pdbName)
} }
ps.VerifyPdbStatus(t, pdbName, 1, 4, 3, 4, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 1, 4, 3, 4, map[string]metav1.Time{})
// Now set one pod as unavailable // Now set one pod as unavailable
pods[0].Status.Conditions = []v1.PodCondition{} pods[0].Status.Conditions = []v1.PodCondition{}
update(t, dc.podStore, pods[0]) update(t, dc.podStore, pods[0])
dc.sync(pdbName) dc.sync(ctx, pdbName)
// Verify expected update // Verify expected update
ps.VerifyPdbStatus(t, pdbName, 0, 3, 3, 4, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 3, 3, 4, map[string]metav1.Time{})
@ -465,13 +467,14 @@ func TestIntegerMaxUnavailable(t *testing.T) {
pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromInt(1)) pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromInt(1))
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
dc.sync(pdbName) ctx := context.TODO()
dc.sync(ctx, pdbName)
// This verifies that when a PDB has 0 pods, disruptions are not allowed. // This verifies that when a PDB has 0 pods, disruptions are not allowed.
ps.VerifyDisruptionAllowed(t, pdbName, 0) ps.VerifyDisruptionAllowed(t, pdbName, 0)
pod, _ := newPod(t, "naked") pod, _ := newPod(t, "naked")
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(ctx, pdbName)
ps.VerifyDisruptionAllowed(t, pdbName, 0) ps.VerifyDisruptionAllowed(t, pdbName, 0)
} }
@ -489,15 +492,16 @@ func TestIntegerMaxUnavailableWithScaling(t *testing.T) {
pod, _ := newPod(t, "pod") pod, _ := newPod(t, "pod")
updatePodOwnerToRs(t, pod, rs) updatePodOwnerToRs(t, pod, rs)
ctx := context.TODO()
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 1, 5, 7, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 1, 5, 7, map[string]metav1.Time{})
// Update scale of ReplicaSet and check PDB // Update scale of ReplicaSet and check PDB
rs.Spec.Replicas = utilpointer.Int32Ptr(5) rs.Spec.Replicas = utilpointer.Int32Ptr(5)
update(t, dc.rsStore, rs) update(t, dc.rsStore, rs)
dc.sync(pdbName) dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 1, 3, 5, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 1, 3, 5, map[string]metav1.Time{})
} }
@ -515,14 +519,15 @@ func TestPercentageMaxUnavailableWithScaling(t *testing.T) {
pod, _ := newPod(t, "pod") pod, _ := newPod(t, "pod")
updatePodOwnerToRs(t, pod, rs) updatePodOwnerToRs(t, pod, rs)
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) ctx := context.TODO()
dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 1, 4, 7, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 1, 4, 7, map[string]metav1.Time{})
// Update scale of ReplicaSet and check PDB // Update scale of ReplicaSet and check PDB
rs.Spec.Replicas = utilpointer.Int32Ptr(3) rs.Spec.Replicas = utilpointer.Int32Ptr(3)
update(t, dc.rsStore, rs) update(t, dc.rsStore, rs)
dc.sync(pdbName) dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 1, 2, 3, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 1, 2, 3, map[string]metav1.Time{})
} }
@ -533,13 +538,14 @@ func TestNakedPod(t *testing.T) {
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%")) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%"))
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
dc.sync(pdbName) ctx := context.TODO()
dc.sync(ctx, pdbName)
// This verifies that when a PDB has 0 pods, disruptions are not allowed. // This verifies that when a PDB has 0 pods, disruptions are not allowed.
ps.VerifyDisruptionAllowed(t, pdbName, 0) ps.VerifyDisruptionAllowed(t, pdbName, 0)
pod, _ := newPod(t, "naked") pod, _ := newPod(t, "naked")
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(ctx, pdbName)
ps.VerifyDisruptionAllowed(t, pdbName, 0) ps.VerifyDisruptionAllowed(t, pdbName, 0)
} }
@ -550,13 +556,14 @@ func TestStatusForUnmanagedPod(t *testing.T) {
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%")) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%"))
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
dc.sync(pdbName) ctx := context.TODO()
dc.sync(ctx, pdbName)
// This verifies that when a PDB has 0 pods, disruptions are not allowed. // This verifies that when a PDB has 0 pods, disruptions are not allowed.
ps.VerifyDisruptionAllowed(t, pdbName, 0) ps.VerifyDisruptionAllowed(t, pdbName, 0)
pod, _ := newPod(t, "unmanaged") pod, _ := newPod(t, "unmanaged")
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(ctx, pdbName)
ps.VerifyNoStatusError(t, pdbName) ps.VerifyNoStatusError(t, pdbName)
@ -568,16 +575,17 @@ func TestTotalUnmanagedPods(t *testing.T) {
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%")) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%"))
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
dc.sync(pdbName) ctx := context.TODO()
dc.sync(ctx, pdbName)
// This verifies that when a PDB has 0 pods, disruptions are not allowed. // This verifies that when a PDB has 0 pods, disruptions are not allowed.
ps.VerifyDisruptionAllowed(t, pdbName, 0) ps.VerifyDisruptionAllowed(t, pdbName, 0)
pod, _ := newPod(t, "unmanaged") pod, _ := newPod(t, "unmanaged")
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(ctx, pdbName)
var pods []*v1.Pod var pods []*v1.Pod
pods = append(pods, pod) pods = append(pods, pod)
_, unmanagedPods, _ := dc.getExpectedScale(pdb, pods) _, unmanagedPods, _ := dc.getExpectedScale(ctx, pdb, pods)
if len(unmanagedPods) != 1 { if len(unmanagedPods) != 1 {
t.Fatalf("expected one pod to be unmanaged pod but found %d", len(unmanagedPods)) t.Fatalf("expected one pod to be unmanaged pod but found %d", len(unmanagedPods))
} }
@ -594,11 +602,11 @@ func TestReplicaSet(t *testing.T) {
rs, _ := newReplicaSet(t, 10) rs, _ := newReplicaSet(t, 10)
add(t, dc.rsStore, rs) add(t, dc.rsStore, rs)
ctx := context.TODO()
pod, _ := newPod(t, "pod") pod, _ := newPod(t, "pod")
updatePodOwnerToRs(t, pod, rs) updatePodOwnerToRs(t, pod, rs)
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 1, 2, 10, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 1, 2, 10, map[string]metav1.Time{})
} }
@ -639,8 +647,8 @@ func TestScaleResource(t *testing.T) {
}) })
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
} }
ctx := context.TODO()
dc.sync(pdbName) dc.sync(ctx, pdbName)
disruptionsAllowed := int32(0) disruptionsAllowed := int32(0)
if replicas-pods < maxUnavailable { if replicas-pods < maxUnavailable {
disruptionsAllowed = maxUnavailable - (replicas - pods) disruptionsAllowed = maxUnavailable - (replicas - pods)
@ -706,7 +714,7 @@ func TestScaleFinderNoResource(t *testing.T) {
UID: customResourceUID, UID: customResourceUID,
} }
_, err := dc.getScaleController(ownerRef, "default") _, err := dc.getScaleController(context.TODO(), ownerRef, "default")
if tc.expectError && err == nil { if tc.expectError && err == nil {
t.Error("expected error, but didn't get one") t.Error("expected error, but didn't get one")
@ -734,8 +742,8 @@ func TestMultipleControllers(t *testing.T) {
pods = append(pods, pod) pods = append(pods, pod)
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
} }
ctx := context.TODO()
dc.sync(pdbName) dc.sync(ctx, pdbName)
// No controllers yet => no disruption allowed // No controllers yet => no disruption allowed
ps.VerifyDisruptionAllowed(t, pdbName, 0) ps.VerifyDisruptionAllowed(t, pdbName, 0)
@ -746,7 +754,7 @@ func TestMultipleControllers(t *testing.T) {
updatePodOwnerToRc(t, pods[i], rc) updatePodOwnerToRc(t, pods[i], rc)
} }
add(t, dc.rcStore, rc) add(t, dc.rcStore, rc)
dc.sync(pdbName) dc.sync(ctx, pdbName)
// One RC and 200%>1% healthy => disruption allowed // One RC and 200%>1% healthy => disruption allowed
ps.VerifyDisruptionAllowed(t, pdbName, 1) ps.VerifyDisruptionAllowed(t, pdbName, 1)
@ -756,7 +764,7 @@ func TestMultipleControllers(t *testing.T) {
updatePodOwnerToRc(t, pods[i], rc) updatePodOwnerToRc(t, pods[i], rc)
} }
add(t, dc.rcStore, rc) add(t, dc.rcStore, rc)
dc.sync(pdbName) dc.sync(ctx, pdbName)
// 100%>1% healthy BUT two RCs => no disruption allowed // 100%>1% healthy BUT two RCs => no disruption allowed
// TODO: Find out if this assert is still needed // TODO: Find out if this assert is still needed
@ -781,7 +789,8 @@ func TestReplicationController(t *testing.T) {
rc, _ := newReplicationController(t, 3) rc, _ := newReplicationController(t, 3)
rc.Spec.Selector = labels rc.Spec.Selector = labels
add(t, dc.rcStore, rc) add(t, dc.rcStore, rc)
dc.sync(pdbName) ctx := context.TODO()
dc.sync(ctx, pdbName)
// It starts out at 0 expected because, with no pods, the PDB doesn't know // It starts out at 0 expected because, with no pods, the PDB doesn't know
// about the RC. This is a known bug. TODO(mml): file issue // about the RC. This is a known bug. TODO(mml): file issue
@ -792,7 +801,7 @@ func TestReplicationController(t *testing.T) {
updatePodOwnerToRc(t, pod, rc) updatePodOwnerToRc(t, pod, rc)
pod.Labels = labels pod.Labels = labels
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(ctx, pdbName)
if i < 2 { if i < 2 {
ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{})
} else { } else {
@ -802,7 +811,7 @@ func TestReplicationController(t *testing.T) {
rogue, _ := newPod(t, "rogue") rogue, _ := newPod(t, "rogue")
add(t, dc.podStore, rogue) add(t, dc.podStore, rogue)
dc.sync(pdbName) dc.sync(ctx, pdbName)
ps.VerifyDisruptionAllowed(t, pdbName, 2) ps.VerifyDisruptionAllowed(t, pdbName, 2)
} }
@ -819,7 +828,8 @@ func TestStatefulSetController(t *testing.T) {
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
ss, _ := newStatefulSet(t, 3) ss, _ := newStatefulSet(t, 3)
add(t, dc.ssStore, ss) add(t, dc.ssStore, ss)
dc.sync(pdbName) ctx := context.TODO()
dc.sync(ctx, pdbName)
// It starts out at 0 expected because, with no pods, the PDB doesn't know // It starts out at 0 expected because, with no pods, the PDB doesn't know
// about the SS. This is a known bug. TODO(mml): file issue // about the SS. This is a known bug. TODO(mml): file issue
@ -830,7 +840,7 @@ func TestStatefulSetController(t *testing.T) {
updatePodOwnerToSs(t, pod, ss) updatePodOwnerToSs(t, pod, ss)
pod.Labels = labels pod.Labels = labels
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(ctx, pdbName)
if i < 2 { if i < 2 {
ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{})
} else { } else {
@ -865,7 +875,8 @@ func TestTwoControllers(t *testing.T) {
rc, _ := newReplicationController(t, collectionSize) rc, _ := newReplicationController(t, collectionSize)
rc.Spec.Selector = rcLabels rc.Spec.Selector = rcLabels
add(t, dc.rcStore, rc) add(t, dc.rcStore, rc)
dc.sync(pdbName) ctx := context.TODO()
dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{})
@ -881,7 +892,7 @@ func TestTwoControllers(t *testing.T) {
pod.Status.Conditions = []v1.PodCondition{} pod.Status.Conditions = []v1.PodCondition{}
} }
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(ctx, pdbName)
if i <= unavailablePods { if i <= unavailablePods {
ps.VerifyPdbStatus(t, pdbName, 0, 0, minimumOne, collectionSize, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 0, minimumOne, collectionSize, map[string]metav1.Time{})
} else if i-unavailablePods <= minimumOne { } else if i-unavailablePods <= minimumOne {
@ -894,14 +905,14 @@ func TestTwoControllers(t *testing.T) {
d, _ := newDeployment(t, collectionSize) d, _ := newDeployment(t, collectionSize)
d.Spec.Selector = newSel(dLabels) d.Spec.Selector = newSel(dLabels)
add(t, dc.dStore, d) add(t, dc.dStore, d)
dc.sync(pdbName) dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]metav1.Time{})
rs, _ := newReplicaSet(t, collectionSize) rs, _ := newReplicaSet(t, collectionSize)
rs.Spec.Selector = newSel(dLabels) rs.Spec.Selector = newSel(dLabels)
rs.Labels = dLabels rs.Labels = dLabels
add(t, dc.rsStore, rs) add(t, dc.rsStore, rs)
dc.sync(pdbName) dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]metav1.Time{})
// By the end of this loop, the number of ready pods should be N+2 (hence minimumTwo+2). // By the end of this loop, the number of ready pods should be N+2 (hence minimumTwo+2).
@ -915,7 +926,7 @@ func TestTwoControllers(t *testing.T) {
pod.Status.Conditions = []v1.PodCondition{} pod.Status.Conditions = []v1.PodCondition{}
} }
add(t, dc.podStore, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(ctx, pdbName)
if i <= unavailablePods { if i <= unavailablePods {
ps.VerifyPdbStatus(t, pdbName, 0, minimumOne+1, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, minimumOne+1, minimumTwo, 2*collectionSize, map[string]metav1.Time{})
} else if i-unavailablePods <= minimumTwo-(minimumOne+1) { } else if i-unavailablePods <= minimumTwo-(minimumOne+1) {
@ -932,17 +943,17 @@ func TestTwoControllers(t *testing.T) {
ps.VerifyPdbStatus(t, pdbName, 2, 2+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 2, 2+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{})
pods[collectionSize-1].Status.Conditions = []v1.PodCondition{} pods[collectionSize-1].Status.Conditions = []v1.PodCondition{}
update(t, dc.podStore, pods[collectionSize-1]) update(t, dc.podStore, pods[collectionSize-1])
dc.sync(pdbName) dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{})
pods[collectionSize-2].Status.Conditions = []v1.PodCondition{} pods[collectionSize-2].Status.Conditions = []v1.PodCondition{}
update(t, dc.podStore, pods[collectionSize-2]) update(t, dc.podStore, pods[collectionSize-2])
dc.sync(pdbName) dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{})
pods[collectionSize-1].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} pods[collectionSize-1].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}
update(t, dc.podStore, pods[collectionSize-1]) update(t, dc.podStore, pods[collectionSize-1])
dc.sync(pdbName) dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{})
} }
@ -951,7 +962,7 @@ func TestPDBNotExist(t *testing.T) {
dc, _ := newFakeDisruptionController() dc, _ := newFakeDisruptionController()
pdb, _ := newMinAvailablePodDisruptionBudget(t, intstr.FromString("67%")) pdb, _ := newMinAvailablePodDisruptionBudget(t, intstr.FromString("67%"))
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
if err := dc.sync("notExist"); err != nil { if err := dc.sync(context.TODO(), "notExist"); err != nil {
t.Errorf("Unexpected error: %v, expect nil", err) t.Errorf("Unexpected error: %v, expect nil", err)
} }
} }
@ -978,7 +989,7 @@ func TestUpdateDisruptedPods(t *testing.T) {
add(t, dc.podStore, pod2) add(t, dc.podStore, pod2)
add(t, dc.podStore, pod3) add(t, dc.podStore, pod3)
dc.sync(pdbName) dc.sync(context.TODO(), pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime}}) ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime}})
} }
@ -1064,7 +1075,7 @@ func TestBasicFinderFunctions(t *testing.T) {
UID: tc.uid, UID: tc.uid,
} }
controllerAndScale, _ := tc.finderFunc(controllerRef, metav1.NamespaceDefault) controllerAndScale, _ := tc.finderFunc(context.TODO(), controllerRef, metav1.NamespaceDefault)
if controllerAndScale == nil { if controllerAndScale == nil {
if tc.findsScale { if tc.findsScale {
@ -1162,7 +1173,7 @@ func TestDeploymentFinderFunction(t *testing.T) {
UID: rs.UID, UID: rs.UID,
} }
controllerAndScale, _ := dc.getPodDeployment(controllerRef, metav1.NamespaceDefault) controllerAndScale, _ := dc.getPodDeployment(context.TODO(), controllerRef, metav1.NamespaceDefault)
if controllerAndScale == nil { if controllerAndScale == nil {
if tc.findsScale { if tc.findsScale {
@ -1199,17 +1210,17 @@ func TestUpdatePDBStatusRetries(t *testing.T) {
dc, _ := newFakeDisruptionController() dc, _ := newFakeDisruptionController()
// Inject the production code over our fake impl // Inject the production code over our fake impl
dc.getUpdater = func() updater { return dc.writePdbStatus } dc.getUpdater = func() updater { return dc.writePdbStatus }
ctx := context.TODO()
// Create a PDB and 3 pods that match it. // Create a PDB and 3 pods that match it.
pdb, pdbKey := newMinAvailablePodDisruptionBudget(t, intstr.FromInt(1)) pdb, pdbKey := newMinAvailablePodDisruptionBudget(t, intstr.FromInt(1))
pdb, err := dc.coreClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).Create(context.TODO(), pdb, metav1.CreateOptions{}) pdb, err := dc.coreClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).Create(ctx, pdb, metav1.CreateOptions{})
if err != nil { if err != nil {
t.Fatalf("Failed to create PDB: %v", err) t.Fatalf("Failed to create PDB: %v", err)
} }
podNames := []string{"moe", "larry", "curly"} podNames := []string{"moe", "larry", "curly"}
for _, name := range podNames { for _, name := range podNames {
pod, _ := newPod(t, name) pod, _ := newPod(t, name)
_, err := dc.coreClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) _, err := dc.coreClient.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil { if err != nil {
t.Fatalf("Failed to create pod: %v", err) t.Fatalf("Failed to create pod: %v", err)
} }
@ -1228,7 +1239,7 @@ func TestUpdatePDBStatusRetries(t *testing.T) {
} }
// Sync DisruptionController once to update PDB status. // Sync DisruptionController once to update PDB status.
if err := dc.sync(pdbKey); err != nil { if err := dc.sync(ctx, pdbKey); err != nil {
t.Fatalf("Failed initial sync: %v", err) t.Fatalf("Failed initial sync: %v", err)
} }
@ -1281,7 +1292,7 @@ func TestUpdatePDBStatusRetries(t *testing.T) {
}) })
// (A) Delete one pod // (A) Delete one pod
if err := dc.coreClient.CoreV1().Pods("default").Delete(context.TODO(), podNames[0], metav1.DeleteOptions{}); err != nil { if err := dc.coreClient.CoreV1().Pods("default").Delete(ctx, podNames[0], metav1.DeleteOptions{}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := waitForCacheCount(dc.podStore, len(podNames)-1); err != nil { if err := waitForCacheCount(dc.podStore, len(podNames)-1); err != nil {
@ -1291,7 +1302,7 @@ func TestUpdatePDBStatusRetries(t *testing.T) {
// The sync() function should either write a correct status which takes the // The sync() function should either write a correct status which takes the
// evictions into account, or re-queue the PDB for another sync (by returning // evictions into account, or re-queue the PDB for another sync (by returning
// an error) // an error)
if err := dc.sync(pdbKey); err != nil { if err := dc.sync(ctx, pdbKey); err != nil {
t.Logf("sync() returned with error: %v", err) t.Logf("sync() returned with error: %v", err)
} else { } else {
t.Logf("sync() returned with no error") t.Logf("sync() returned with no error")
@ -1299,7 +1310,7 @@ func TestUpdatePDBStatusRetries(t *testing.T) {
// (C) Whether or not sync() returned an error, the PDB status should reflect // (C) Whether or not sync() returned an error, the PDB status should reflect
// the evictions that took place. // the evictions that took place.
finalPDB, err := dc.coreClient.PolicyV1().PodDisruptionBudgets("default").Get(context.TODO(), pdb.Name, metav1.GetOptions{}) finalPDB, err := dc.coreClient.PolicyV1().PodDisruptionBudgets("default").Get(ctx, pdb.Name, metav1.GetOptions{})
if err != nil { if err != nil {
t.Fatalf("Failed to get PDB: %v", err) t.Fatalf("Failed to get PDB: %v", err)
} }
@ -1309,6 +1320,8 @@ func TestUpdatePDBStatusRetries(t *testing.T) {
} }
func TestInvalidSelectors(t *testing.T) { func TestInvalidSelectors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testCases := map[string]struct { testCases := map[string]struct {
labelSelector *metav1.LabelSelector labelSelector *metav1.LabelSelector
}{ }{
@ -1340,7 +1353,7 @@ func TestInvalidSelectors(t *testing.T) {
pdb.Spec.Selector = tc.labelSelector pdb.Spec.Selector = tc.labelSelector
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
dc.sync(pdbName) dc.sync(ctx, pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{})
}) })
} }

View File

@ -98,14 +98,12 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti
func TestPDBWithScaleSubresource(t *testing.T) { func TestPDBWithScaleSubresource(t *testing.T) {
s, pdbc, informers, clientSet, apiExtensionClient, dynamicClient := setup(t) s, pdbc, informers, clientSet, apiExtensionClient, dynamicClient := setup(t)
defer s.TearDownFn() defer s.TearDownFn()
ctx := context.TODO()
nsName := "pdb-scale-subresource" nsName := "pdb-scale-subresource"
createNs(t, nsName, clientSet) createNs(ctx, t, nsName, clientSet)
stopCh := make(chan struct{}) informers.Start(ctx.Done())
informers.Start(stopCh) go pdbc.Run(ctx)
go pdbc.Run(stopCh)
defer close(stopCh)
crdDefinition := newCustomResourceDefinition() crdDefinition := newCustomResourceDefinition()
etcd.CreateTestCRDs(t, apiExtensionClient, true, crdDefinition) etcd.CreateTestCRDs(t, apiExtensionClient, true, crdDefinition)
@ -128,7 +126,7 @@ func TestPDBWithScaleSubresource(t *testing.T) {
}, },
}, },
} }
createdResource, err := resourceClient.Create(context.TODO(), resource, metav1.CreateOptions{}) createdResource, err := resourceClient.Create(ctx, resource, metav1.CreateOptions{})
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -144,7 +142,7 @@ func TestPDBWithScaleSubresource(t *testing.T) {
}, },
} }
for i := 0; i < replicas; i++ { for i := 0; i < replicas; i++ {
createPod(t, fmt.Sprintf("pod-%d", i), nsName, map[string]string{"app": "test-crd"}, clientSet, ownerRefs) createPod(ctx, t, fmt.Sprintf("pod-%d", i), nsName, map[string]string{"app": "test-crd"}, clientSet, ownerRefs)
} }
waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4, v1.PodRunning) waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4, v1.PodRunning)
@ -163,13 +161,13 @@ func TestPDBWithScaleSubresource(t *testing.T) {
}, },
}, },
} }
if _, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Create(context.TODO(), pdb, metav1.CreateOptions{}); err != nil { if _, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Create(ctx, pdb, metav1.CreateOptions{}); err != nil {
t.Errorf("Error creating PodDisruptionBudget: %v", err) t.Errorf("Error creating PodDisruptionBudget: %v", err)
} }
waitPDBStable(t, clientSet, 4, nsName, pdb.Name) waitPDBStable(ctx, t, clientSet, 4, nsName, pdb.Name)
newPdb, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Get(context.TODO(), pdb.Name, metav1.GetOptions{}) newPdb, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Get(ctx, pdb.Name, metav1.GetOptions{})
if err != nil { if err != nil {
t.Errorf("Error getting PodDisruptionBudget: %v", err) t.Errorf("Error getting PodDisruptionBudget: %v", err)
} }
@ -186,6 +184,8 @@ func TestPDBWithScaleSubresource(t *testing.T) {
} }
func TestEmptySelector(t *testing.T) { func TestEmptySelector(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testcases := []struct { testcases := []struct {
name string name string
createPDBFunc func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error createPDBFunc func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error
@ -203,7 +203,7 @@ func TestEmptySelector(t *testing.T) {
Selector: &metav1.LabelSelector{}, Selector: &metav1.LabelSelector{},
}, },
} }
_, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(nsName).Create(context.TODO(), pdb, metav1.CreateOptions{}) _, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(nsName).Create(ctx, pdb, metav1.CreateOptions{})
return err return err
}, },
expectedCurrentHealthy: 0, expectedCurrentHealthy: 0,
@ -220,7 +220,7 @@ func TestEmptySelector(t *testing.T) {
Selector: &metav1.LabelSelector{}, Selector: &metav1.LabelSelector{},
}, },
} }
_, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Create(context.TODO(), pdb, metav1.CreateOptions{}) _, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Create(ctx, pdb, metav1.CreateOptions{})
return err return err
}, },
expectedCurrentHealthy: 4, expectedCurrentHealthy: 4,
@ -233,18 +233,16 @@ func TestEmptySelector(t *testing.T) {
defer s.TearDownFn() defer s.TearDownFn()
nsName := fmt.Sprintf("pdb-empty-selector-%d", i) nsName := fmt.Sprintf("pdb-empty-selector-%d", i)
createNs(t, nsName, clientSet) createNs(ctx, t, nsName, clientSet)
stopCh := make(chan struct{}) informers.Start(ctx.Done())
informers.Start(stopCh) go pdbc.Run(ctx)
go pdbc.Run(stopCh)
defer close(stopCh)
replicas := 4 replicas := 4
minAvailable := intstr.FromInt(2) minAvailable := intstr.FromInt(2)
for j := 0; j < replicas; j++ { for j := 0; j < replicas; j++ {
createPod(t, fmt.Sprintf("pod-%d", j), nsName, map[string]string{"app": "test-crd"}, createPod(ctx, t, fmt.Sprintf("pod-%d", j), nsName, map[string]string{"app": "test-crd"},
clientSet, []metav1.OwnerReference{}) clientSet, []metav1.OwnerReference{})
} }
@ -255,9 +253,9 @@ func TestEmptySelector(t *testing.T) {
t.Errorf("Error creating PodDisruptionBudget: %v", err) t.Errorf("Error creating PodDisruptionBudget: %v", err)
} }
waitPDBStable(t, clientSet, tc.expectedCurrentHealthy, nsName, pdbName) waitPDBStable(ctx, t, clientSet, tc.expectedCurrentHealthy, nsName, pdbName)
newPdb, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Get(context.TODO(), pdbName, metav1.GetOptions{}) newPdb, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Get(ctx, pdbName, metav1.GetOptions{})
if err != nil { if err != nil {
t.Errorf("Error getting PodDisruptionBudget: %v", err) t.Errorf("Error getting PodDisruptionBudget: %v", err)
} }
@ -270,6 +268,8 @@ func TestEmptySelector(t *testing.T) {
} }
func TestSelectorsForPodsWithoutLabels(t *testing.T) { func TestSelectorsForPodsWithoutLabels(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testcases := []struct { testcases := []struct {
name string name string
createPDBFunc func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error createPDBFunc func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error
@ -311,7 +311,7 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) {
}, },
}, },
} }
_, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Create(context.TODO(), pdb, metav1.CreateOptions{}) _, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Create(ctx, pdb, metav1.CreateOptions{})
return err return err
}, },
expectedCurrentHealthy: 1, expectedCurrentHealthy: 1,
@ -335,7 +335,7 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) {
}, },
}, },
} }
_, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(nsName).Create(context.TODO(), pdb, metav1.CreateOptions{}) _, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(nsName).Create(ctx, pdb, metav1.CreateOptions{})
return err return err
}, },
expectedCurrentHealthy: 1, expectedCurrentHealthy: 1,
@ -348,12 +348,10 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) {
defer s.TearDownFn() defer s.TearDownFn()
nsName := fmt.Sprintf("pdb-selectors-%d", i) nsName := fmt.Sprintf("pdb-selectors-%d", i)
createNs(t, nsName, clientSet) createNs(ctx, t, nsName, clientSet)
stopCh := make(chan struct{}) informers.Start(ctx.Done())
informers.Start(stopCh) go pdbc.Run(ctx)
go pdbc.Run(stopCh)
defer close(stopCh)
minAvailable := intstr.FromInt(1) minAvailable := intstr.FromInt(1)
@ -362,16 +360,16 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) {
if err := tc.createPDBFunc(clientSet, pdbName, nsName, minAvailable); err != nil { if err := tc.createPDBFunc(clientSet, pdbName, nsName, minAvailable); err != nil {
t.Errorf("Error creating PodDisruptionBudget: %v", err) t.Errorf("Error creating PodDisruptionBudget: %v", err)
} }
waitPDBStable(t, clientSet, 0, nsName, pdbName) waitPDBStable(ctx, t, clientSet, 0, nsName, pdbName)
// Create a pod and wait for it be reach the running phase. // Create a pod and wait for it be reach the running phase.
createPod(t, "pod", nsName, map[string]string{}, clientSet, []metav1.OwnerReference{}) createPod(ctx, t, "pod", nsName, map[string]string{}, clientSet, []metav1.OwnerReference{})
waitToObservePods(t, informers.Core().V1().Pods().Informer(), 1, v1.PodRunning) waitToObservePods(t, informers.Core().V1().Pods().Informer(), 1, v1.PodRunning)
// Then verify that the added pod are picked up by the disruption controller. // Then verify that the added pod are picked up by the disruption controller.
waitPDBStable(t, clientSet, 1, nsName, pdbName) waitPDBStable(ctx, t, clientSet, 1, nsName, pdbName)
newPdb, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Get(context.TODO(), pdbName, metav1.GetOptions{}) newPdb, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Get(ctx, pdbName, metav1.GetOptions{})
if err != nil { if err != nil {
t.Errorf("Error getting PodDisruptionBudget: %v", err) t.Errorf("Error getting PodDisruptionBudget: %v", err)
} }
@ -383,7 +381,7 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) {
} }
} }
func createPod(t *testing.T, name, namespace string, labels map[string]string, clientSet clientset.Interface, ownerRefs []metav1.OwnerReference) { func createPod(ctx context.Context, t *testing.T, name, namespace string, labels map[string]string, clientSet clientset.Interface, ownerRefs []metav1.OwnerReference) {
pod := &v1.Pod{ pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: name,
@ -400,7 +398,7 @@ func createPod(t *testing.T, name, namespace string, labels map[string]string, c
}, },
}, },
} }
_, err := clientSet.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) _, err := clientSet.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -410,8 +408,8 @@ func createPod(t *testing.T, name, namespace string, labels map[string]string, c
} }
} }
func createNs(t *testing.T, name string, clientSet clientset.Interface) { func createNs(ctx context.Context, t *testing.T, name string, clientSet clientset.Interface) {
_, err := clientSet.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ _, err := clientSet.CoreV1().Namespaces().Create(ctx, &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: name,
}, },
@ -463,9 +461,9 @@ func newCustomResourceDefinition() *apiextensionsv1.CustomResourceDefinition {
} }
} }
func waitPDBStable(t *testing.T, clientSet clientset.Interface, podNum int32, ns, pdbName string) { func waitPDBStable(ctx context.Context, t *testing.T, clientSet clientset.Interface, podNum int32, ns, pdbName string) {
if err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) { if err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) {
pdb, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns).Get(context.TODO(), pdbName, metav1.GetOptions{}) pdb, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns).Get(ctx, pdbName, metav1.GetOptions{})
if err != nil { if err != nil {
return false, err return false, err
} }

View File

@ -67,10 +67,10 @@ func TestConcurrentEvictionRequests(t *testing.T) {
ns := framework.CreateTestingNamespace("concurrent-eviction-requests", s, t) ns := framework.CreateTestingNamespace("concurrent-eviction-requests", s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
stopCh := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background())
informers.Start(stopCh) defer cancel()
go rm.Run(stopCh) informers.Start(ctx.Done())
defer close(stopCh) go rm.Run(ctx)
config := restclient.Config{Host: s.URL} config := restclient.Config{Host: s.URL}
clientSet, err := clientset.NewForConfig(&config) clientSet, err := clientset.NewForConfig(&config)
@ -186,10 +186,10 @@ func TestTerminalPodEviction(t *testing.T) {
ns := framework.CreateTestingNamespace("terminalpod-eviction", s, t) ns := framework.CreateTestingNamespace("terminalpod-eviction", s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
stopCh := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background())
informers.Start(stopCh) defer cancel()
go rm.Run(stopCh) informers.Start(ctx.Done())
defer close(stopCh) go rm.Run(ctx)
config := restclient.Config{Host: s.URL} config := restclient.Config{Host: s.URL}
clientSet, err := clientset.NewForConfig(&config) clientSet, err := clientset.NewForConfig(&config)
@ -262,10 +262,10 @@ func TestEvictionVersions(t *testing.T) {
s, closeFn, rm, informers, clientSet := rmSetup(t) s, closeFn, rm, informers, clientSet := rmSetup(t)
defer closeFn() defer closeFn()
stopCh := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background())
informers.Start(stopCh) defer cancel()
go rm.Run(stopCh) informers.Start(ctx.Done())
defer close(stopCh) go rm.Run(ctx)
config := restclient.Config{Host: s.URL} config := restclient.Config{Host: s.URL}

View File

@ -77,7 +77,7 @@ func initDisruptionController(t *testing.T, testCtx *testutils.TestContext) *dis
informers.Start(testCtx.Scheduler.StopEverything) informers.Start(testCtx.Scheduler.StopEverything)
informers.WaitForCacheSync(testCtx.Scheduler.StopEverything) informers.WaitForCacheSync(testCtx.Scheduler.StopEverything)
go dc.Run(testCtx.Scheduler.StopEverything) go dc.Run(testCtx.Ctx)
return dc return dc
} }