Add worker to clean up stale DisruptionTarget condition

Change-Id: I907fbdf01e7ff08d823fb23aa168ff271d8ff1ee
This commit is contained in:
Aldo Culquicondor 2022-07-26 14:24:40 -04:00
parent dad8454ebb
commit 4188d9b646
3 changed files with 386 additions and 28 deletions

View File

@ -51,22 +51,30 @@ import (
"k8s.io/client-go/util/workqueue"
pdbhelper "k8s.io/component-helpers/apps/poddisruptionbudget"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
utilpod "k8s.io/kubernetes/pkg/util/pod"
"k8s.io/utils/clock"
)
// DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status
// to the time when the pod is expected to be seen by PDB controller as having been marked for deletion.
// If the pod was not marked for deletion during that time it is assumed that it won't be deleted at
// all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that
// pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should
// be more than enough.
// If the controller is running on a different node it is important that the two nodes have synced
// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
// protection against unwanted pod disruptions.
const (
// DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status
// to the time when the pod is expected to be seen by PDB controller as having been marked for deletion.
// If the pod was not marked for deletion during that time it is assumed that it won't be deleted at
// all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that
// pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should
// be more than enough.
// If the controller is running on a different node it is important that the two nodes have synced
// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
// protection against unwanted pod disruptions.
DeletionTimeout = 2 * time.Minute
// stalePodDisruptionTimeout sets the maximum time a pod can have a stale
// DisruptionTarget condition (the condition is present, but the Pod doesn't
// have a DeletionTimestamp).
// Once the timeout is reached, this controller attempts to set the status
// of the condition to False.
stalePodDisruptionTimeout = 2 * time.Minute
)
type updater func(context.Context, *policy.PodDisruptionBudget) error
@ -100,6 +108,10 @@ type DisruptionController struct {
queue workqueue.RateLimitingInterface
recheckQueue workqueue.DelayingInterface
// pod keys that need to be synced due to a stale DisruptionTarget condition.
stalePodDisruptionQueue workqueue.RateLimitingInterface
stalePodDisruptionTimeout time.Duration
broadcaster record.EventBroadcaster
recorder record.EventRecorder
@ -142,7 +154,8 @@ func NewDisruptionController(
restMapper,
scaleNamespacer,
discoveryClient,
clock.RealClock{})
clock.RealClock{},
stalePodDisruptionTimeout)
}
// NewDisruptionControllerInternal allows to set a clock and
@ -160,12 +173,15 @@ func NewDisruptionControllerInternal(
scaleNamespacer scaleclient.ScalesGetter,
discoveryClient discovery.DiscoveryInterface,
clock clock.WithTicker,
stalePodDisruptionTimeout time.Duration,
) *DisruptionController {
dc := &DisruptionController{
kubeClient: kubeClient,
queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "disruption"), workqueue.DefaultControllerRateLimiter()),
recheckQueue: workqueue.NewDelayingQueueWithCustomClock(clock, "disruption_recheck"),
broadcaster: record.NewBroadcaster(),
kubeClient: kubeClient,
queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "disruption"), workqueue.DefaultControllerRateLimiter()),
recheckQueue: workqueue.NewDelayingQueueWithCustomClock(clock, "disruption_recheck"),
stalePodDisruptionQueue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "stale_pod_disruption"), workqueue.DefaultControllerRateLimiter()),
broadcaster: record.NewBroadcaster(),
stalePodDisruptionTimeout: stalePodDisruptionTimeout,
}
dc.recorder = dc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"})
@ -411,6 +427,7 @@ func (dc *DisruptionController) Run(ctx context.Context) {
defer dc.queue.ShutDown()
defer dc.recheckQueue.ShutDown()
defer dc.stalePodDisruptionQueue.ShutDown()
klog.Infof("Starting disruption controller")
defer klog.Infof("Shutting down disruption controller")
@ -421,6 +438,7 @@ func (dc *DisruptionController) Run(ctx context.Context) {
go wait.UntilWithContext(ctx, dc.worker, time.Second)
go wait.Until(dc.recheckWorker, time.Second, ctx.Done())
go wait.UntilWithContext(ctx, dc.stalePodDisruptionWorker, time.Second)
<-ctx.Done()
}
@ -462,22 +480,28 @@ func (dc *DisruptionController) addPod(obj interface{}) {
pdb := dc.getPdbForPod(pod)
if pdb == nil {
klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
return
} else {
klog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name)
dc.enqueuePdb(pdb)
}
if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has {
dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter)
}
klog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name)
dc.enqueuePdb(pdb)
}
func (dc *DisruptionController) updatePod(old, cur interface{}) {
func (dc *DisruptionController) updatePod(_, cur interface{}) {
pod := cur.(*v1.Pod)
klog.V(4).Infof("updatePod called on pod %q", pod.Name)
pdb := dc.getPdbForPod(pod)
if pdb == nil {
klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
return
} else {
klog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name)
dc.enqueuePdb(pdb)
}
if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has {
dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter)
}
klog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name)
dc.enqueuePdb(pdb)
}
func (dc *DisruptionController) deletePod(obj interface{}) {
@ -527,6 +551,16 @@ func (dc *DisruptionController) enqueuePdbForRecheck(pdb *policy.PodDisruptionBu
dc.recheckQueue.AddAfter(key, delay)
}
func (dc *DisruptionController) enqueueStalePodDisruptionCleanup(pod *v1.Pod, d time.Duration) {
key, err := controller.KeyFunc(pod)
if err != nil {
klog.ErrorS(err, "Couldn't get key for Pod object", "pod", klog.KObj(pod))
return
}
dc.stalePodDisruptionQueue.AddAfter(key, d)
klog.V(4).InfoS("Enqueued pod to cleanup stale DisruptionTarget condition", "pod", klog.KObj(pod))
}
func (dc *DisruptionController) getPdbForPod(pod *v1.Pod) *policy.PodDisruptionBudget {
// GetPodPodDisruptionBudgets returns an error only if no
// PodDisruptionBudgets are found. We don't return that as an error to the
@ -598,6 +632,27 @@ func (dc *DisruptionController) processNextRecheckWorkItem() bool {
return true
}
func (dc *DisruptionController) stalePodDisruptionWorker(ctx context.Context) {
for dc.processNextStalePodDisruptionWorkItem(ctx) {
}
}
func (dc *DisruptionController) processNextStalePodDisruptionWorkItem(ctx context.Context) bool {
key, quit := dc.stalePodDisruptionQueue.Get()
if quit {
return false
}
defer dc.stalePodDisruptionQueue.Done(key)
err := dc.syncStalePodDisruption(ctx, key.(string))
if err == nil {
dc.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("error syncing Pod %v to clear DisruptionTarget condition, requeueing: %v", key.(string), err))
dc.stalePodDisruptionQueue.AddRateLimited(key)
return true
}
func (dc *DisruptionController) sync(ctx context.Context, key string) error {
startTime := dc.clock.Now()
defer func() {
@ -666,6 +721,48 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr
return err
}
func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key string) error {
startTime := dc.clock.Now()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
defer func() {
klog.V(4).InfoS("Finished syncing Pod to clear DisruptionTarget condition", "pod", klog.KRef(namespace, name), "duration", dc.clock.Since(startTime))
}()
pod, err := dc.podLister.Pods(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(4).InfoS("Skipping clearing DisruptionTarget condition because pod was deleted", "pod", klog.KObj(pod))
return nil
}
if err != nil {
return err
}
hasCond, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod)
if !hasCond {
return nil
}
if cleanAfter > 0 {
dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter)
return nil
}
newStatus := pod.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionFalse,
})
if !updated {
return nil
}
if _, _, _, err := utilpod.PatchPodStatus(ctx, dc.kubeClient, namespace, name, pod.UID, pod.Status, *newStatus); err != nil {
return err
}
klog.V(2).InfoS("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod))
return nil
}
func (dc *DisruptionController) getExpectedPodCount(ctx context.Context, pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy int32, unmanagedPods []string, err error) {
err = nil
// TODO(davidopp): consider making the way expectedCount and rules about
@ -782,7 +879,7 @@ func countHealthyPods(pods []*v1.Pod, disruptedPods map[string]metav1.Time, curr
if disruptionTime, found := disruptedPods[pod.Name]; found && disruptionTime.Time.Add(DeletionTimeout).After(currentTime) {
continue
}
if podutil.IsPodReady(pod) {
if apipod.IsPodReady(pod) {
currentHealthy++
}
}
@ -892,3 +989,18 @@ func (dc *DisruptionController) writePdbStatus(ctx context.Context, pdb *policy.
_, err := dc.kubeClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(ctx, pdb, metav1.UpdateOptions{})
return err
}
func (dc *DisruptionController) nonTerminatingPodHasStaleDisruptionCondition(pod *v1.Pod) (bool, time.Duration) {
if pod.DeletionTimestamp != nil {
return false, 0
}
_, cond := apipod.GetPodCondition(&pod.Status, v1.AlphaNoCompatGuaranteeDisruptionTarget)
if cond == nil || cond.Status != v1.ConditionTrue {
return false, 0
}
waitFor := dc.stalePodDisruptionTimeout - dc.clock.Since(cond.LastTransitionTime.Time)
if waitFor < 0 {
waitFor = 0
}
return true, waitFor
}

View File

@ -139,6 +139,7 @@ type disruptionController struct {
coreClient *fake.Clientset
scaleClient *scalefake.FakeScaleClient
discoveryClient *discoveryfake.FakeDiscovery
informerFactory informers.SharedInformerFactory
}
var customGVK = schema.GroupVersionKind{
@ -148,6 +149,10 @@ var customGVK = schema.GroupVersionKind{
}
func newFakeDisruptionController() (*disruptionController, *pdbStates) {
return newFakeDisruptionControllerWithTime(context.TODO(), time.Now())
}
func newFakeDisruptionControllerWithTime(ctx context.Context, now time.Time) (*disruptionController, *pdbStates) {
ps := &pdbStates{}
coreClient := fake.NewSimpleClientset()
@ -159,7 +164,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
fakeDiscovery := &discoveryfake.FakeDiscovery{
Fake: &core.Fake{},
}
fakeClock := clocktesting.NewFakeClock(time.Now())
fakeClock := clocktesting.NewFakeClock(now)
dc := NewDisruptionControllerInternal(
informerFactory.Core().V1().Pods(),
@ -173,6 +178,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
fakeScaleClient,
fakeDiscovery,
fakeClock,
stalePodDisruptionTimeout,
)
dc.getUpdater = func() updater { return ps.Set }
dc.podListerSynced = alwaysReady
@ -181,9 +187,8 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
dc.rsListerSynced = alwaysReady
dc.dListerSynced = alwaysReady
dc.ssListerSynced = alwaysReady
ctx := context.TODO()
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(nil)
informerFactory.WaitForCacheSync(ctx.Done())
return &disruptionController{
dc,
@ -196,6 +201,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
coreClient,
fakeScaleClient,
fakeDiscovery,
informerFactory,
}, ps
}
@ -1381,6 +1387,151 @@ func TestInvalidSelectors(t *testing.T) {
}
}
func TestStalePodDisruption(t *testing.T) {
now := time.Now()
cases := map[string]struct {
pod *v1.Pod
timePassed time.Duration
wantConditions []v1.PodCondition
}{
"stale pod disruption": {
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: now},
},
},
},
},
timePassed: 2*time.Minute + time.Second,
wantConditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionFalse,
},
},
},
"pod disruption in progress": {
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: now},
},
},
},
},
timePassed: 2*time.Minute - time.Second,
wantConditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
},
},
},
"pod disruption actuated": {
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
DeletionTimestamp: &metav1.Time{Time: now},
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: now},
},
},
},
},
timePassed: 2*time.Minute + time.Second,
wantConditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
},
},
},
"no pod disruption": {
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
DeletionTimestamp: &metav1.Time{Time: now},
},
},
timePassed: 2*time.Minute + time.Second,
},
"pod disruption cleared": {
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
DeletionTimestamp: &metav1.Time{Time: now},
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionFalse,
},
},
},
},
timePassed: 2*time.Minute + time.Second,
wantConditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionFalse,
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dc, _ := newFakeDisruptionControllerWithTime(ctx, now)
go dc.Run(ctx)
if _, err := dc.coreClient.CoreV1().Pods(tc.pod.Namespace).Create(ctx, tc.pod, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create pod: %v", err)
}
if err := dc.informerFactory.Core().V1().Pods().Informer().GetIndexer().Add(tc.pod); err != nil {
t.Fatalf("Failed adding pod to indexer: %v", err)
}
dc.clock.Sleep(tc.timePassed)
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
return dc.stalePodDisruptionQueue.Len() == 0, nil
}); err != nil {
t.Fatalf("Failed waiting for worker to sync: %v", err)
}
pod, err := dc.kubeClient.CoreV1().Pods(tc.pod.Namespace).Get(ctx, tc.pod.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed getting updated pod: %v", err)
}
diff := cmp.Diff(tc.wantConditions, pod.Status.Conditions, cmpopts.IgnoreFields(v1.PodCondition{}, "LastTransitionTime"))
if diff != "" {
t.Errorf("Obtained pod conditions (-want,+got):\n%s", diff)
}
})
}
}
// waitForCacheCount blocks until the given cache store has the desired number
// of items in it. This will return an error if the condition is not met after a
// 10 second timeout.

View File

@ -24,6 +24,7 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/api/policy/v1beta1"
@ -48,9 +49,13 @@ import (
"k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/test/integration/etcd"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/integration/util"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
)
const stalePodDisruptionTimeout = 3 * time.Second
func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface, *apiextensionsclientset.Clientset, dynamic.Interface) {
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins", "ServiceAccount"}, framework.SharedEtcd())
@ -82,7 +87,7 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti
t.Fatalf("Error creating dynamicClient: %v", err)
}
pdbc := disruption.NewDisruptionController(
pdbc := disruption.NewDisruptionControllerInternal(
informers.Core().V1().Pods(),
informers.Policy().V1().PodDisruptionBudgets(),
informers.Core().V1().ReplicationControllers(),
@ -93,6 +98,8 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti
mapper,
scaleClient,
client.Discovery(),
clock.RealClock{},
stalePodDisruptionTimeout,
)
return server, pdbc, informers, clientSet, apiExtensionClient, dynamicClient
}
@ -409,7 +416,7 @@ func createPod(ctx context.Context, t *testing.T, name, namespace string, labels
t.Error(err)
}
addPodConditionReady(pod)
if _, err := clientSet.CoreV1().Pods(namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}); err != nil {
if _, err := clientSet.CoreV1().Pods(namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil {
t.Error(err)
}
}
@ -644,3 +651,91 @@ func TestPatchCompatibility(t *testing.T) {
})
}
}
func TestStalePodDisruption(t *testing.T) {
s, pdbc, informers, clientSet, _, _ := setup(t)
defer s.TearDownFn()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nsName := "pdb-stale-pod-disruption"
createNs(ctx, t, nsName, clientSet)
informers.Start(ctx.Done())
informers.WaitForCacheSync(ctx.Done())
go pdbc.Run(ctx)
cases := map[string]struct {
deletePod bool
wantConditions []v1.PodCondition
}{
"stale-condition": {
wantConditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionFalse,
},
},
},
"deleted-pod": {
deletePod: true,
wantConditions: []v1.PodCondition{
{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
pod := util.InitPausePod(&util.PausePodConfig{
Name: name,
Namespace: nsName,
NodeName: "foo", // mock pod as scheduled so that it's not immediately deleted when calling Delete.
})
var err error
pod, err = util.CreatePausePod(clientSet, pod)
if err != nil {
t.Fatalf("Failed creating pod: %v", err)
}
pod.Status.Phase = v1.PodRunning
pod.Status.Conditions = append(pod.Status.Conditions, v1.PodCondition{
Type: v1.AlphaNoCompatGuaranteeDisruptionTarget,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
})
pod, err = clientSet.CoreV1().Pods(nsName).UpdateStatus(ctx, pod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed updating pod: %v", err)
}
if tc.deletePod {
if err := clientSet.CoreV1().Pods(nsName).Delete(ctx, name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("Failed to delete pod: %v", err)
}
}
time.Sleep(stalePodDisruptionTimeout)
diff := ""
if err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
pod, err = clientSet.CoreV1().Pods(nsName).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return false, err
}
if tc.deletePod && pod.DeletionTimestamp == nil {
return false, nil
}
diff = cmp.Diff(tc.wantConditions, pod.Status.Conditions, cmpopts.IgnoreFields(v1.PodCondition{}, "LastTransitionTime"))
return diff == "", nil
}); err != nil {
t.Errorf("Failed waiting for status to change: %v", err)
if diff != "" {
t.Errorf("Pod has conditions (-want,+got):\n%s", diff)
}
}
})
}
}