Merge pull request #111475 from alculquicondor/clear_pod_disruption

Add worker to clean up stale DisruptionTarget condition
This commit is contained in:
Kubernetes Prow Robot 2022-08-02 11:38:18 -07:00 committed by GitHub
commit bc4c4930ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 446 additions and 44 deletions

View File

@ -51,21 +51,30 @@ import (
"k8s.io/client-go/util/workqueue"
pdbhelper "k8s.io/component-helpers/apps/poddisruptionbudget"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
utilpod "k8s.io/kubernetes/pkg/util/pod"
"k8s.io/utils/clock"
)
// DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status
// to the time when the pod is expected to be seen by PDB controller as having been marked for deletion.
// If the pod was not marked for deletion during that time it is assumed that it won't be deleted at
// all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that
// pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should
// be more than enough.
// If the controller is running on a different node it is important that the two nodes have synced
// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
// protection against unwanted pod disruptions.
const (
DeletionTimeout = 2 * 60 * time.Second
// DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status
// to the time when the pod is expected to be seen by PDB controller as having been marked for deletion.
// If the pod was not marked for deletion during that time it is assumed that it won't be deleted at
// all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that
// pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should
// be more than enough.
// If the controller is running on a different node it is important that the two nodes have synced
// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
// protection against unwanted pod disruptions.
DeletionTimeout = 2 * time.Minute
// stalePodDisruptionTimeout sets the maximum time a pod can have a stale
// DisruptionTarget condition (the condition is present, but the Pod doesn't
// have a DeletionTimestamp).
// Once the timeout is reached, this controller attempts to set the status
// of the condition to False.
stalePodDisruptionTimeout = 2 * time.Minute
)
type updater func(context.Context, *policy.PodDisruptionBudget) error
@ -99,10 +108,16 @@ type DisruptionController struct {
queue workqueue.RateLimitingInterface
recheckQueue workqueue.DelayingInterface
// pod keys that need to be synced due to a stale DisruptionTarget condition.
stalePodDisruptionQueue workqueue.RateLimitingInterface
stalePodDisruptionTimeout time.Duration
broadcaster record.EventBroadcaster
recorder record.EventRecorder
getUpdater func() updater
clock clock.Clock
}
// controllerAndScale is used to return (controller, scale) pairs from the
@ -127,12 +142,46 @@ func NewDisruptionController(
restMapper apimeta.RESTMapper,
scaleNamespacer scaleclient.ScalesGetter,
discoveryClient discovery.DiscoveryInterface,
) *DisruptionController {
return NewDisruptionControllerInternal(
podInformer,
pdbInformer,
rcInformer,
rsInformer,
dInformer,
ssInformer,
kubeClient,
restMapper,
scaleNamespacer,
discoveryClient,
clock.RealClock{},
stalePodDisruptionTimeout)
}
// NewDisruptionControllerInternal allows to set a clock and
// stalePodDisruptionTimeout
// It is only supposed to be used by tests.
func NewDisruptionControllerInternal(
podInformer coreinformers.PodInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
rcInformer coreinformers.ReplicationControllerInformer,
rsInformer appsv1informers.ReplicaSetInformer,
dInformer appsv1informers.DeploymentInformer,
ssInformer appsv1informers.StatefulSetInformer,
kubeClient clientset.Interface,
restMapper apimeta.RESTMapper,
scaleNamespacer scaleclient.ScalesGetter,
discoveryClient discovery.DiscoveryInterface,
clock clock.WithTicker,
stalePodDisruptionTimeout time.Duration,
) *DisruptionController {
dc := &DisruptionController{
kubeClient: kubeClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"),
recheckQueue: workqueue.NewNamedDelayingQueue("disruption_recheck"),
broadcaster: record.NewBroadcaster(),
kubeClient: kubeClient,
queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "disruption"), workqueue.DefaultControllerRateLimiter()),
recheckQueue: workqueue.NewDelayingQueueWithCustomClock(clock, "disruption_recheck"),
stalePodDisruptionQueue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "stale_pod_disruption"), workqueue.DefaultControllerRateLimiter()),
broadcaster: record.NewBroadcaster(),
stalePodDisruptionTimeout: stalePodDisruptionTimeout,
}
dc.recorder = dc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"})
@ -172,6 +221,8 @@ func NewDisruptionController(
dc.scaleNamespacer = scaleNamespacer
dc.discoveryClient = discoveryClient
dc.clock = clock
return dc
}
@ -376,6 +427,7 @@ func (dc *DisruptionController) Run(ctx context.Context) {
defer dc.queue.ShutDown()
defer dc.recheckQueue.ShutDown()
defer dc.stalePodDisruptionQueue.ShutDown()
klog.Infof("Starting disruption controller")
defer klog.Infof("Shutting down disruption controller")
@ -386,6 +438,7 @@ func (dc *DisruptionController) Run(ctx context.Context) {
go wait.UntilWithContext(ctx, dc.worker, time.Second)
go wait.Until(dc.recheckWorker, time.Second, ctx.Done())
go wait.UntilWithContext(ctx, dc.stalePodDisruptionWorker, time.Second)
<-ctx.Done()
}
@ -427,22 +480,28 @@ func (dc *DisruptionController) addPod(obj interface{}) {
pdb := dc.getPdbForPod(pod)
if pdb == nil {
klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
return
} else {
klog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name)
dc.enqueuePdb(pdb)
}
if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has {
dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter)
}
klog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name)
dc.enqueuePdb(pdb)
}
func (dc *DisruptionController) updatePod(old, cur interface{}) {
func (dc *DisruptionController) updatePod(_, cur interface{}) {
pod := cur.(*v1.Pod)
klog.V(4).Infof("updatePod called on pod %q", pod.Name)
pdb := dc.getPdbForPod(pod)
if pdb == nil {
klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
return
} else {
klog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name)
dc.enqueuePdb(pdb)
}
if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has {
dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter)
}
klog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name)
dc.enqueuePdb(pdb)
}
func (dc *DisruptionController) deletePod(obj interface{}) {
@ -492,6 +551,16 @@ func (dc *DisruptionController) enqueuePdbForRecheck(pdb *policy.PodDisruptionBu
dc.recheckQueue.AddAfter(key, delay)
}
func (dc *DisruptionController) enqueueStalePodDisruptionCleanup(pod *v1.Pod, d time.Duration) {
key, err := controller.KeyFunc(pod)
if err != nil {
klog.ErrorS(err, "Couldn't get key for Pod object", "pod", klog.KObj(pod))
return
}
dc.stalePodDisruptionQueue.AddAfter(key, d)
klog.V(4).InfoS("Enqueued pod to cleanup stale DisruptionTarget condition", "pod", klog.KObj(pod))
}
func (dc *DisruptionController) getPdbForPod(pod *v1.Pod) *policy.PodDisruptionBudget {
// GetPodPodDisruptionBudgets returns an error only if no
// PodDisruptionBudgets are found. We don't return that as an error to the
@ -563,10 +632,31 @@ func (dc *DisruptionController) processNextRecheckWorkItem() bool {
return true
}
func (dc *DisruptionController) stalePodDisruptionWorker(ctx context.Context) {
for dc.processNextStalePodDisruptionWorkItem(ctx) {
}
}
func (dc *DisruptionController) processNextStalePodDisruptionWorkItem(ctx context.Context) bool {
key, quit := dc.stalePodDisruptionQueue.Get()
if quit {
return false
}
defer dc.stalePodDisruptionQueue.Done(key)
err := dc.syncStalePodDisruption(ctx, key.(string))
if err == nil {
dc.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("error syncing Pod %v to clear DisruptionTarget condition, requeueing: %v", key.(string), err))
dc.stalePodDisruptionQueue.AddRateLimited(key)
return true
}
func (dc *DisruptionController) sync(ctx context.Context, key string) error {
startTime := time.Now()
startTime := dc.clock.Now()
defer func() {
klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Since(startTime))
klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, dc.clock.Since(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
@ -617,7 +707,7 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr
strings.Join(unmanagedPods, ",'"))
}
currentTime := time.Now()
currentTime := dc.clock.Now()
disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime)
currentHealthy := countHealthyPods(pods, disruptedPods, currentTime)
err = dc.updatePdbStatus(ctx, pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods)
@ -631,6 +721,48 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr
return err
}
func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key string) error {
startTime := dc.clock.Now()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
defer func() {
klog.V(4).InfoS("Finished syncing Pod to clear DisruptionTarget condition", "pod", klog.KRef(namespace, name), "duration", dc.clock.Since(startTime))
}()
pod, err := dc.podLister.Pods(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(4).InfoS("Skipping clearing DisruptionTarget condition because pod was deleted", "pod", klog.KObj(pod))
return nil
}
if err != nil {
return err
}
hasCond, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod)
if !hasCond {
return nil
}
if cleanAfter > 0 {
dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter)
return nil
}
newStatus := pod.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionFalse,
})
if !updated {
return nil
}
if _, _, _, err := utilpod.PatchPodStatus(ctx, dc.kubeClient, namespace, name, pod.UID, pod.Status, *newStatus); err != nil {
return err
}
klog.V(2).InfoS("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod))
return nil
}
func (dc *DisruptionController) getExpectedPodCount(ctx context.Context, pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy int32, unmanagedPods []string, err error) {
err = nil
// TODO(davidopp): consider making the way expectedCount and rules about
@ -747,7 +879,7 @@ func countHealthyPods(pods []*v1.Pod, disruptedPods map[string]metav1.Time, curr
if disruptionTime, found := disruptedPods[pod.Name]; found && disruptionTime.Time.Add(DeletionTimeout).After(currentTime) {
continue
}
if podutil.IsPodReady(pod) {
if apipod.IsPodReady(pod) {
currentHealthy++
}
}
@ -857,3 +989,18 @@ func (dc *DisruptionController) writePdbStatus(ctx context.Context, pdb *policy.
_, err := dc.kubeClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(ctx, pdb, metav1.UpdateOptions{})
return err
}
func (dc *DisruptionController) nonTerminatingPodHasStaleDisruptionCondition(pod *v1.Pod) (bool, time.Duration) {
if pod.DeletionTimestamp != nil {
return false, 0
}
_, cond := apipod.GetPodCondition(&pod.Status, v1.AlphaNoCompatGuaranteeDisruptionTarget)
if cond == nil || cond.Status != v1.ConditionTrue {
return false, 0
}
waitFor := dc.stalePodDisruptionTimeout - dc.clock.Since(cond.LastTransitionTime.Time)
if waitFor < 0 {
waitFor = 0
}
return true, waitFor
}

View File

@ -27,11 +27,12 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
apps "k8s.io/api/apps/v1"
autoscalingapi "k8s.io/api/autoscaling/v1"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/meta/testrestmapper"
@ -52,6 +53,7 @@ import (
"k8s.io/klog/v2"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller"
clocktesting "k8s.io/utils/clock/testing"
utilpointer "k8s.io/utils/pointer"
)
@ -72,8 +74,8 @@ func (ps *pdbStates) Get(key string) policy.PodDisruptionBudget {
return (*ps)[key]
}
func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowed, currentHealthy, desiredHealthy, expectedPods int32,
disruptedPodMap map[string]metav1.Time) {
func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowed, currentHealthy, desiredHealthy, expectedPods int32, disruptedPodMap map[string]metav1.Time) {
t.Helper()
actualPDB := ps.Get(key)
actualConditions := actualPDB.Status.Conditions
actualPDB.Status.Conditions = nil
@ -86,9 +88,8 @@ func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowe
ObservedGeneration: actualPDB.Generation,
}
actualStatus := actualPDB.Status
if !apiequality.Semantic.DeepEqual(actualStatus, expectedStatus) {
debug.PrintStack()
t.Fatalf("PDB %q status mismatch. Expected %+v but got %+v.", key, expectedStatus, actualStatus)
if diff := cmp.Diff(expectedStatus, actualStatus, cmpopts.EquateEmpty()); diff != "" {
t.Fatalf("PDB %q status mismatch (-want,+got):\n%s", key, diff)
}
cond := apimeta.FindStatusCondition(actualConditions, policy.DisruptionAllowedCondition)
@ -138,6 +139,7 @@ type disruptionController struct {
coreClient *fake.Clientset
scaleClient *scalefake.FakeScaleClient
discoveryClient *discoveryfake.FakeDiscovery
informerFactory informers.SharedInformerFactory
}
var customGVK = schema.GroupVersionKind{
@ -147,6 +149,10 @@ var customGVK = schema.GroupVersionKind{
}
func newFakeDisruptionController() (*disruptionController, *pdbStates) {
return newFakeDisruptionControllerWithTime(context.TODO(), time.Now())
}
func newFakeDisruptionControllerWithTime(ctx context.Context, now time.Time) (*disruptionController, *pdbStates) {
ps := &pdbStates{}
coreClient := fake.NewSimpleClientset()
@ -158,8 +164,9 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
fakeDiscovery := &discoveryfake.FakeDiscovery{
Fake: &core.Fake{},
}
fakeClock := clocktesting.NewFakeClock(now)
dc := NewDisruptionController(
dc := NewDisruptionControllerInternal(
informerFactory.Core().V1().Pods(),
informerFactory.Policy().V1().PodDisruptionBudgets(),
informerFactory.Core().V1().ReplicationControllers(),
@ -170,6 +177,8 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
testrestmapper.TestOnlyStaticRESTMapper(scheme),
fakeScaleClient,
fakeDiscovery,
fakeClock,
stalePodDisruptionTimeout,
)
dc.getUpdater = func() updater { return ps.Set }
dc.podListerSynced = alwaysReady
@ -178,9 +187,8 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
dc.rsListerSynced = alwaysReady
dc.dListerSynced = alwaysReady
dc.ssListerSynced = alwaysReady
ctx := context.TODO()
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(nil)
informerFactory.WaitForCacheSync(ctx.Done())
return &disruptionController{
dc,
@ -193,6 +201,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
coreClient,
fakeScaleClient,
fakeDiscovery,
informerFactory,
}, ps
}
@ -990,17 +999,17 @@ func TestUpdateDisruptedPods(t *testing.T) {
dc, ps := newFakeDisruptionController()
dc.recheckQueue = workqueue.NewNamedDelayingQueue("pdb_queue")
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt(1))
currentTime := time.Now()
currentTime := dc.clock.Now()
pdb.Status.DisruptedPods = map[string]metav1.Time{
"p1": {Time: currentTime}, // Should be removed, pod deletion started.
"p2": {Time: currentTime.Add(-5 * time.Minute)}, // Should be removed, expired.
"p3": {Time: currentTime}, // Should remain, pod untouched.
"p2": {Time: currentTime.Add(-3 * time.Minute)}, // Should be removed, expired.
"p3": {Time: currentTime.Add(-time.Minute)}, // Should remain, pod untouched.
"notthere": {Time: currentTime}, // Should be removed, pod deleted.
}
add(t, dc.pdbStore, pdb)
pod1, _ := newPod(t, "p1")
pod1.DeletionTimestamp = &metav1.Time{Time: time.Now()}
pod1.DeletionTimestamp = &metav1.Time{Time: dc.clock.Now()}
pod2, _ := newPod(t, "p2")
pod3, _ := newPod(t, "p3")
@ -1010,7 +1019,7 @@ func TestUpdateDisruptedPods(t *testing.T) {
dc.sync(context.TODO(), pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime}})
ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime.Add(-time.Minute)}})
}
func TestBasicFinderFunctions(t *testing.T) {
@ -1284,7 +1293,7 @@ func TestUpdatePDBStatusRetries(t *testing.T) {
updatedPDB.Status.DisruptionsAllowed -= int32(len(podNames))
updatedPDB.Status.DisruptedPods = make(map[string]metav1.Time)
for _, name := range podNames {
updatedPDB.Status.DisruptedPods[name] = metav1.NewTime(time.Now())
updatedPDB.Status.DisruptedPods[name] = metav1.NewTime(dc.clock.Now())
}
if err := dc.coreClient.Tracker().Update(poddisruptionbudgetsResource, updatedPDB, updatedPDB.Namespace); err != nil {
t.Fatalf("Eviction (PDB update) failed: %v", err)
@ -1378,6 +1387,151 @@ func TestInvalidSelectors(t *testing.T) {
}
}
func TestStalePodDisruption(t *testing.T) {
now := time.Now()
cases := map[string]struct {
pod *v1.Pod
timePassed time.Duration
wantConditions []v1.PodCondition
}{
"stale pod disruption": {
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: now},
},
},
},
},
timePassed: 2*time.Minute + time.Second,
wantConditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionFalse,
},
},
},
"pod disruption in progress": {
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: now},
},
},
},
},
timePassed: 2*time.Minute - time.Second,
wantConditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
},
},
},
"pod disruption actuated": {
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
DeletionTimestamp: &metav1.Time{Time: now},
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: now},
},
},
},
},
timePassed: 2*time.Minute + time.Second,
wantConditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
},
},
},
"no pod disruption": {
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
DeletionTimestamp: &metav1.Time{Time: now},
},
},
timePassed: 2*time.Minute + time.Second,
},
"pod disruption cleared": {
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
DeletionTimestamp: &metav1.Time{Time: now},
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionFalse,
},
},
},
},
timePassed: 2*time.Minute + time.Second,
wantConditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionFalse,
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dc, _ := newFakeDisruptionControllerWithTime(ctx, now)
go dc.Run(ctx)
if _, err := dc.coreClient.CoreV1().Pods(tc.pod.Namespace).Create(ctx, tc.pod, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create pod: %v", err)
}
if err := dc.informerFactory.Core().V1().Pods().Informer().GetIndexer().Add(tc.pod); err != nil {
t.Fatalf("Failed adding pod to indexer: %v", err)
}
dc.clock.Sleep(tc.timePassed)
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
return dc.stalePodDisruptionQueue.Len() == 0, nil
}); err != nil {
t.Fatalf("Failed waiting for worker to sync: %v", err)
}
pod, err := dc.kubeClient.CoreV1().Pods(tc.pod.Namespace).Get(ctx, tc.pod.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed getting updated pod: %v", err)
}
diff := cmp.Diff(tc.wantConditions, pod.Status.Conditions, cmpopts.IgnoreFields(v1.PodCondition{}, "LastTransitionTime"))
if diff != "" {
t.Errorf("Obtained pod conditions (-want,+got):\n%s", diff)
}
})
}
}
// waitForCacheCount blocks until the given cache store has the desired number
// of items in it. This will return an error if the condition is not met after a
// 10 second timeout.

View File

@ -50,6 +50,13 @@ func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitin
}
}
func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface {
return &rateLimitingType{
DelayingInterface: di,
rateLimiter: rateLimiter,
}
}
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
type rateLimitingType struct {
DelayingInterface

View File

@ -24,7 +24,7 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/api/policy/v1beta1"
@ -49,9 +49,13 @@ import (
"k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/test/integration/etcd"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/integration/util"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
)
const stalePodDisruptionTimeout = 3 * time.Second
func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface, *apiextensionsclientset.Clientset, dynamic.Interface) {
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins", "ServiceAccount"}, framework.SharedEtcd())
@ -83,7 +87,7 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti
t.Fatalf("Error creating dynamicClient: %v", err)
}
pdbc := disruption.NewDisruptionController(
pdbc := disruption.NewDisruptionControllerInternal(
informers.Core().V1().Pods(),
informers.Policy().V1().PodDisruptionBudgets(),
informers.Core().V1().ReplicationControllers(),
@ -94,6 +98,8 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti
mapper,
scaleClient,
client.Discovery(),
clock.RealClock{},
stalePodDisruptionTimeout,
)
return server, pdbc, informers, clientSet, apiExtensionClient, dynamicClient
}
@ -410,7 +416,7 @@ func createPod(ctx context.Context, t *testing.T, name, namespace string, labels
t.Error(err)
}
addPodConditionReady(pod)
if _, err := clientSet.CoreV1().Pods(namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}); err != nil {
if _, err := clientSet.CoreV1().Pods(namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil {
t.Error(err)
}
}
@ -645,3 +651,91 @@ func TestPatchCompatibility(t *testing.T) {
})
}
}
func TestStalePodDisruption(t *testing.T) {
s, pdbc, informers, clientSet, _, _ := setup(t)
defer s.TearDownFn()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nsName := "pdb-stale-pod-disruption"
createNs(ctx, t, nsName, clientSet)
informers.Start(ctx.Done())
informers.WaitForCacheSync(ctx.Done())
go pdbc.Run(ctx)
cases := map[string]struct {
deletePod bool
wantConditions []v1.PodCondition
}{
"stale-condition": {
wantConditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionFalse,
},
},
},
"deleted-pod": {
deletePod: true,
wantConditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
pod := util.InitPausePod(&util.PausePodConfig{
Name: name,
Namespace: nsName,
NodeName: "foo", // mock pod as scheduled so that it's not immediately deleted when calling Delete.
})
var err error
pod, err = util.CreatePausePod(clientSet, pod)
if err != nil {
t.Fatalf("Failed creating pod: %v", err)
}
pod.Status.Phase = v1.PodRunning
pod.Status.Conditions = append(pod.Status.Conditions, v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
})
pod, err = clientSet.CoreV1().Pods(nsName).UpdateStatus(ctx, pod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed updating pod: %v", err)
}
if tc.deletePod {
if err := clientSet.CoreV1().Pods(nsName).Delete(ctx, name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("Failed to delete pod: %v", err)
}
}
time.Sleep(stalePodDisruptionTimeout)
diff := ""
if err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
pod, err = clientSet.CoreV1().Pods(nsName).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return false, err
}
if tc.deletePod && pod.DeletionTimestamp == nil {
return false, nil
}
diff = cmp.Diff(tc.wantConditions, pod.Status.Conditions, cmpopts.IgnoreFields(v1.PodCondition{}, "LastTransitionTime"))
return diff == "", nil
}); err != nil {
t.Errorf("Failed waiting for status to change: %v", err)
if diff != "" {
t.Errorf("Pod has conditions (-want,+got):\n%s", diff)
}
}
})
}
}