diff --git a/pkg/controller/disruption/BUILD b/pkg/controller/disruption/BUILD index 01658479321..2d1560f189e 100644 --- a/pkg/controller/disruption/BUILD +++ b/pkg/controller/disruption/BUILD @@ -32,7 +32,6 @@ go_library( "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/typed/policy/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library", @@ -56,6 +55,7 @@ go_test( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta/testrestmapper:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", @@ -63,11 +63,14 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/scale/fake:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", + "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library", ], ) diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 8be258d10c7..7e72dd1b03a 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -39,7 +39,6 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" - policyclientset "k8s.io/client-go/kubernetes/typed/policy/v1beta1" appsv1listers "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" policylisters "k8s.io/client-go/listers/policy/v1beta1" @@ -53,8 +52,6 @@ import ( "k8s.io/klog" ) -const statusUpdateRetries = 2 - // DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status // to the time when the pod is expected to be seen by PDB controller as having been marked for deletion. // If the pod was not marked for deletion during that time it is assumed that it won't be deleted at @@ -544,7 +541,13 @@ func (dc *DisruptionController) sync(key string) error { return err } - if err := dc.trySync(pdb); err != nil { + err = dc.trySync(pdb) + // If the reason for failure was a conflict, then allow this PDB update to be + // requeued without triggering the failSafe logic. + if errors.IsConflict(err) { + return err + } + if err != nil { klog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err) return dc.failSafe(pdb) } @@ -785,29 +788,9 @@ func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget, return dc.getUpdater()(newPdb) } -// refresh tries to re-GET the given PDB. If there are any errors, it just -// returns the old PDB. Intended to be used in a retry loop where it runs a -// bounded number of times. -func refresh(pdbClient policyclientset.PodDisruptionBudgetInterface, pdb *policy.PodDisruptionBudget) *policy.PodDisruptionBudget { - newPdb, err := pdbClient.Get(pdb.Name, metav1.GetOptions{}) - if err == nil { - return newPdb - } - return pdb - -} - func (dc *DisruptionController) writePdbStatus(pdb *policy.PodDisruptionBudget) error { - pdbClient := dc.kubeClient.PolicyV1beta1().PodDisruptionBudgets(pdb.Namespace) - st := pdb.Status - - var err error - for i, pdb := 0, pdb; i < statusUpdateRetries; i, pdb = i+1, refresh(pdbClient, pdb) { - pdb.Status = st - if _, err = pdbClient.UpdateStatus(pdb); err == nil { - break - } - } - + // If this update fails, don't retry it. Allow the failure to get handled & + // retried in `processNextWorkItem()`. + _, err := dc.kubeClient.PolicyV1beta1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(pdb) return err } diff --git a/pkg/controller/disruption/disruption_test.go b/pkg/controller/disruption/disruption_test.go index ea3d4570c4a..fe3769b5062 100644 --- a/pkg/controller/disruption/disruption_test.go +++ b/pkg/controller/disruption/disruption_test.go @@ -17,16 +17,21 @@ limitations under the License. package disruption import ( + "context" + "flag" "fmt" + "os" "runtime/debug" + "sync" "testing" "time" apps "k8s.io/api/apps/v1" autoscalingapi "k8s.io/api/autoscaling/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta/testrestmapper" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -34,11 +39,14 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" scalefake "k8s.io/client-go/scale/fake" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "k8s.io/klog" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/controller" utilpointer "k8s.io/utils/pointer" @@ -97,6 +105,7 @@ type disruptionController struct { dStore cache.Store ssStore cache.Store + coreClient *fake.Clientset scaleClient *scalefake.FakeScaleClient } @@ -109,7 +118,8 @@ var customGVK = schema.GroupVersionKind{ func newFakeDisruptionController() (*disruptionController, *pdbStates) { ps := &pdbStates{} - informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + coreClient := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(coreClient, controller.NoResyncPeriodFunc()) scheme := runtime.NewScheme() scheme.AddKnownTypeWithName(customGVK, &v1.Service{}) @@ -122,7 +132,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { informerFactory.Apps().V1().ReplicaSets(), informerFactory.Apps().V1().Deployments(), informerFactory.Apps().V1().StatefulSets(), - nil, + coreClient, testrestmapper.TestOnlyStaticRESTMapper(scheme), fakeScaleClient, ) @@ -134,6 +144,9 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { dc.dListerSynced = alwaysReady dc.ssListerSynced = alwaysReady + informerFactory.Start(context.TODO().Done()) + informerFactory.WaitForCacheSync(nil) + return &disruptionController{ dc, informerFactory.Core().V1().Pods().Informer().GetStore(), @@ -142,6 +155,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { informerFactory.Apps().V1().ReplicaSets().Informer().GetStore(), informerFactory.Apps().V1().Deployments().Informer().GetStore(), informerFactory.Apps().V1().StatefulSets().Informer().GetStore(), + coreClient, fakeScaleClient, }, ps } @@ -1025,3 +1039,144 @@ func TestDeploymentFinderFunction(t *testing.T) { }) } } + +// This test checks that the disruption controller does not write stale data to +// a PDB status during race conditions with the eviction handler. Specifically, +// failed updates due to ResourceVersion conflict should not cause a stale value +// of PodDisruptionsAllowed to be written. +// +// In this test, PodDisruptionsAllowed starts at 2. +// (A) We will delete 1 pod and trigger DisruptionController to set +// PodDisruptionsAllowed to 1. +// (B) As the DisruptionController attempts this write, we will evict the +// remaining 2 pods and update PodDisruptionsAllowed to 0. (The real eviction +// handler would allow this because it still sees PodDisruptionsAllowed=2.) +// (C) If the DisruptionController writes PodDisruptionsAllowed=1 despite the +// resource conflict error, then there is a bug. +func TestUpdatePDBStatusRetries(t *testing.T) { + dc, _ := newFakeDisruptionController() + // Inject the production code over our fake impl + dc.getUpdater = func() updater { return dc.writePdbStatus } + + // Create a PDB and 3 pods that match it. + pdb, pdbKey := newMinAvailablePodDisruptionBudget(t, intstr.FromInt(1)) + pdb, err := dc.coreClient.PolicyV1beta1().PodDisruptionBudgets(pdb.Namespace).Create(pdb) + if err != nil { + t.Fatalf("Failed to create PDB: %v", err) + } + podNames := []string{"moe", "larry", "curly"} + for _, name := range podNames { + pod, _ := newPod(t, name) + _, err := dc.coreClient.CoreV1().Pods(pod.Namespace).Create(pod) + if err != nil { + t.Fatalf("Failed to create pod: %v", err) + } + } + + // Block until the fake clientset writes are observable in the informer caches. + // FUN FACT: This guarantees that the informer caches have updated, but it does + // not guarantee that informer event handlers have completed. Fortunately, + // DisruptionController does most of its logic by reading from informer + // listers, so this guarantee is sufficient. + if err := waitForCacheCount(dc.pdbStore, 1); err != nil { + t.Fatalf("Failed to verify PDB in informer cache: %v", err) + } + if err := waitForCacheCount(dc.podStore, len(podNames)); err != nil { + t.Fatalf("Failed to verify pods in informer cache: %v", err) + } + + // Sync DisruptionController once to update PDB status. + if err := dc.sync(pdbKey); err != nil { + t.Fatalf("Failed initial sync: %v", err) + } + + // Evict simulates the visible effects of eviction in our fake client. + evict := func(podNames ...string) { + // These GVRs are copied from the generated fake code because they are not exported. + var ( + podsResource = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} + poddisruptionbudgetsResource = schema.GroupVersionResource{Group: "policy", Version: "v1beta1", Resource: "poddisruptionbudgets"} + ) + + // Bypass the coreClient.Fake and write directly to the ObjectTracker, because + // this helper will be called while the Fake is holding a lock. + obj, err := dc.coreClient.Tracker().Get(poddisruptionbudgetsResource, pdb.Namespace, pdb.Name) + if err != nil { + t.Fatalf("Failed to get PDB: %v", err) + } + updatedPDB := obj.(*policy.PodDisruptionBudget) + // Each eviction, + // - decrements PodDisruptionsAllowed + // - adds the pod to DisruptedPods + // - deletes the pod + updatedPDB.Status.PodDisruptionsAllowed -= int32(len(podNames)) + updatedPDB.Status.DisruptedPods = make(map[string]metav1.Time) + for _, name := range podNames { + updatedPDB.Status.DisruptedPods[name] = metav1.NewTime(time.Now()) + } + if err := dc.coreClient.Tracker().Update(poddisruptionbudgetsResource, updatedPDB, updatedPDB.Namespace); err != nil { + t.Fatalf("Eviction (PDB update) failed: %v", err) + } + for _, name := range podNames { + if err := dc.coreClient.Tracker().Delete(podsResource, "default", name); err != nil { + t.Fatalf("Eviction (pod delete) failed: %v", err) + } + } + } + + // The fake kube client does not update ResourceVersion or check for conflicts. + // Instead, we add a reactor that returns a conflict error on the first PDB + // update and success after that. + var failOnce sync.Once + dc.coreClient.Fake.PrependReactor("update", "poddisruptionbudgets", func(a core.Action) (handled bool, obj runtime.Object, err error) { + failOnce.Do(func() { + // (B) Evict two pods and fail this update. + evict(podNames[1], podNames[2]) + handled = true + err = errors.NewConflict(a.GetResource().GroupResource(), pdb.Name, fmt.Errorf("conflict")) + }) + return handled, obj, err + }) + + // (A) Delete one pod + if err := dc.coreClient.CoreV1().Pods("default").Delete(podNames[0], &metav1.DeleteOptions{}); err != nil { + t.Fatal(err) + } + if err := waitForCacheCount(dc.podStore, len(podNames)-1); err != nil { + t.Fatalf("Failed to verify pods in informer cache: %v", err) + } + + // The sync() function should either write a correct status which takes the + // evictions into account, or re-queue the PDB for another sync (by returning + // an error) + if err := dc.sync(pdbKey); err != nil { + t.Logf("sync() returned with error: %v", err) + } else { + t.Logf("sync() returned with no error") + } + + // (C) Whether or not sync() returned an error, the PDB status should reflect + // the evictions that took place. + finalPDB, err := dc.coreClient.PolicyV1beta1().PodDisruptionBudgets("default").Get(pdb.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get PDB: %v", err) + } + if expected, actual := int32(0), finalPDB.Status.PodDisruptionsAllowed; expected != actual { + t.Errorf("PodDisruptionsAllowed should be %d, got %d", expected, actual) + } +} + +// waitForCacheCount blocks until the given cache store has the desired number +// of items in it. This will return an error if the condition is not met after a +// 10 second timeout. +func waitForCacheCount(store cache.Store, n int) error { + return wait.Poll(10*time.Millisecond, 10*time.Second, func() (bool, error) { + return len(store.List()) == n, nil + }) +} + +// TestMain adds klog flags to make debugging tests easier. +func TestMain(m *testing.M) { + klog.InitFlags(flag.CommandLine) + os.Exit(m.Run()) +}