Merge pull request #82152 from misterikkit/disruption

Fix retry logic in DisruptionController
This commit is contained in:
Kubernetes Prow Robot 2019-10-23 12:48:58 -07:00 committed by GitHub
commit 65fa4c979a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 172 additions and 31 deletions

View File

@ -32,7 +32,6 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
@ -56,6 +55,7 @@ go_test(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta/testrestmapper:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
@ -63,11 +63,14 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/scale/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)

View File

@ -39,7 +39,6 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
policyclientset "k8s.io/client-go/kubernetes/typed/policy/v1beta1"
appsv1listers "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1beta1"
@ -53,8 +52,6 @@ import (
"k8s.io/klog"
)
const statusUpdateRetries = 2
// DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status
// to the time when the pod is expected to be seen by PDB controller as having been marked for deletion.
// If the pod was not marked for deletion during that time it is assumed that it won't be deleted at
@ -544,7 +541,13 @@ func (dc *DisruptionController) sync(key string) error {
return err
}
if err := dc.trySync(pdb); err != nil {
err = dc.trySync(pdb)
// If the reason for failure was a conflict, then allow this PDB update to be
// requeued without triggering the failSafe logic.
if errors.IsConflict(err) {
return err
}
if err != nil {
klog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err)
return dc.failSafe(pdb)
}
@ -785,29 +788,9 @@ func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget,
return dc.getUpdater()(newPdb)
}
// refresh tries to re-GET the given PDB. If there are any errors, it just
// returns the old PDB. Intended to be used in a retry loop where it runs a
// bounded number of times.
func refresh(pdbClient policyclientset.PodDisruptionBudgetInterface, pdb *policy.PodDisruptionBudget) *policy.PodDisruptionBudget {
newPdb, err := pdbClient.Get(pdb.Name, metav1.GetOptions{})
if err == nil {
return newPdb
}
return pdb
}
func (dc *DisruptionController) writePdbStatus(pdb *policy.PodDisruptionBudget) error {
pdbClient := dc.kubeClient.PolicyV1beta1().PodDisruptionBudgets(pdb.Namespace)
st := pdb.Status
var err error
for i, pdb := 0, pdb; i < statusUpdateRetries; i, pdb = i+1, refresh(pdbClient, pdb) {
pdb.Status = st
if _, err = pdbClient.UpdateStatus(pdb); err == nil {
break
}
}
// If this update fails, don't retry it. Allow the failure to get handled &
// retried in `processNextWorkItem()`.
_, err := dc.kubeClient.PolicyV1beta1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(pdb)
return err
}

View File

@ -17,16 +17,21 @@ limitations under the License.
package disruption
import (
"context"
"flag"
"fmt"
"os"
"runtime/debug"
"sync"
"testing"
"time"
apps "k8s.io/api/apps/v1"
autoscalingapi "k8s.io/api/autoscaling/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta/testrestmapper"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -34,11 +39,14 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
scalefake "k8s.io/client-go/scale/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller"
utilpointer "k8s.io/utils/pointer"
@ -97,6 +105,7 @@ type disruptionController struct {
dStore cache.Store
ssStore cache.Store
coreClient *fake.Clientset
scaleClient *scalefake.FakeScaleClient
}
@ -109,7 +118,8 @@ var customGVK = schema.GroupVersionKind{
func newFakeDisruptionController() (*disruptionController, *pdbStates) {
ps := &pdbStates{}
informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc())
coreClient := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(coreClient, controller.NoResyncPeriodFunc())
scheme := runtime.NewScheme()
scheme.AddKnownTypeWithName(customGVK, &v1.Service{})
@ -122,7 +132,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().Deployments(),
informerFactory.Apps().V1().StatefulSets(),
nil,
coreClient,
testrestmapper.TestOnlyStaticRESTMapper(scheme),
fakeScaleClient,
)
@ -134,6 +144,9 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
dc.dListerSynced = alwaysReady
dc.ssListerSynced = alwaysReady
informerFactory.Start(context.TODO().Done())
informerFactory.WaitForCacheSync(nil)
return &disruptionController{
dc,
informerFactory.Core().V1().Pods().Informer().GetStore(),
@ -142,6 +155,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
informerFactory.Apps().V1().ReplicaSets().Informer().GetStore(),
informerFactory.Apps().V1().Deployments().Informer().GetStore(),
informerFactory.Apps().V1().StatefulSets().Informer().GetStore(),
coreClient,
fakeScaleClient,
}, ps
}
@ -1025,3 +1039,144 @@ func TestDeploymentFinderFunction(t *testing.T) {
})
}
}
// This test checks that the disruption controller does not write stale data to
// a PDB status during race conditions with the eviction handler. Specifically,
// failed updates due to ResourceVersion conflict should not cause a stale value
// of PodDisruptionsAllowed to be written.
//
// In this test, PodDisruptionsAllowed starts at 2.
// (A) We will delete 1 pod and trigger DisruptionController to set
// PodDisruptionsAllowed to 1.
// (B) As the DisruptionController attempts this write, we will evict the
// remaining 2 pods and update PodDisruptionsAllowed to 0. (The real eviction
// handler would allow this because it still sees PodDisruptionsAllowed=2.)
// (C) If the DisruptionController writes PodDisruptionsAllowed=1 despite the
// resource conflict error, then there is a bug.
func TestUpdatePDBStatusRetries(t *testing.T) {
dc, _ := newFakeDisruptionController()
// Inject the production code over our fake impl
dc.getUpdater = func() updater { return dc.writePdbStatus }
// Create a PDB and 3 pods that match it.
pdb, pdbKey := newMinAvailablePodDisruptionBudget(t, intstr.FromInt(1))
pdb, err := dc.coreClient.PolicyV1beta1().PodDisruptionBudgets(pdb.Namespace).Create(pdb)
if err != nil {
t.Fatalf("Failed to create PDB: %v", err)
}
podNames := []string{"moe", "larry", "curly"}
for _, name := range podNames {
pod, _ := newPod(t, name)
_, err := dc.coreClient.CoreV1().Pods(pod.Namespace).Create(pod)
if err != nil {
t.Fatalf("Failed to create pod: %v", err)
}
}
// Block until the fake clientset writes are observable in the informer caches.
// FUN FACT: This guarantees that the informer caches have updated, but it does
// not guarantee that informer event handlers have completed. Fortunately,
// DisruptionController does most of its logic by reading from informer
// listers, so this guarantee is sufficient.
if err := waitForCacheCount(dc.pdbStore, 1); err != nil {
t.Fatalf("Failed to verify PDB in informer cache: %v", err)
}
if err := waitForCacheCount(dc.podStore, len(podNames)); err != nil {
t.Fatalf("Failed to verify pods in informer cache: %v", err)
}
// Sync DisruptionController once to update PDB status.
if err := dc.sync(pdbKey); err != nil {
t.Fatalf("Failed initial sync: %v", err)
}
// Evict simulates the visible effects of eviction in our fake client.
evict := func(podNames ...string) {
// These GVRs are copied from the generated fake code because they are not exported.
var (
podsResource = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
poddisruptionbudgetsResource = schema.GroupVersionResource{Group: "policy", Version: "v1beta1", Resource: "poddisruptionbudgets"}
)
// Bypass the coreClient.Fake and write directly to the ObjectTracker, because
// this helper will be called while the Fake is holding a lock.
obj, err := dc.coreClient.Tracker().Get(poddisruptionbudgetsResource, pdb.Namespace, pdb.Name)
if err != nil {
t.Fatalf("Failed to get PDB: %v", err)
}
updatedPDB := obj.(*policy.PodDisruptionBudget)
// Each eviction,
// - decrements PodDisruptionsAllowed
// - adds the pod to DisruptedPods
// - deletes the pod
updatedPDB.Status.PodDisruptionsAllowed -= int32(len(podNames))
updatedPDB.Status.DisruptedPods = make(map[string]metav1.Time)
for _, name := range podNames {
updatedPDB.Status.DisruptedPods[name] = metav1.NewTime(time.Now())
}
if err := dc.coreClient.Tracker().Update(poddisruptionbudgetsResource, updatedPDB, updatedPDB.Namespace); err != nil {
t.Fatalf("Eviction (PDB update) failed: %v", err)
}
for _, name := range podNames {
if err := dc.coreClient.Tracker().Delete(podsResource, "default", name); err != nil {
t.Fatalf("Eviction (pod delete) failed: %v", err)
}
}
}
// The fake kube client does not update ResourceVersion or check for conflicts.
// Instead, we add a reactor that returns a conflict error on the first PDB
// update and success after that.
var failOnce sync.Once
dc.coreClient.Fake.PrependReactor("update", "poddisruptionbudgets", func(a core.Action) (handled bool, obj runtime.Object, err error) {
failOnce.Do(func() {
// (B) Evict two pods and fail this update.
evict(podNames[1], podNames[2])
handled = true
err = errors.NewConflict(a.GetResource().GroupResource(), pdb.Name, fmt.Errorf("conflict"))
})
return handled, obj, err
})
// (A) Delete one pod
if err := dc.coreClient.CoreV1().Pods("default").Delete(podNames[0], &metav1.DeleteOptions{}); err != nil {
t.Fatal(err)
}
if err := waitForCacheCount(dc.podStore, len(podNames)-1); err != nil {
t.Fatalf("Failed to verify pods in informer cache: %v", err)
}
// The sync() function should either write a correct status which takes the
// evictions into account, or re-queue the PDB for another sync (by returning
// an error)
if err := dc.sync(pdbKey); err != nil {
t.Logf("sync() returned with error: %v", err)
} else {
t.Logf("sync() returned with no error")
}
// (C) Whether or not sync() returned an error, the PDB status should reflect
// the evictions that took place.
finalPDB, err := dc.coreClient.PolicyV1beta1().PodDisruptionBudgets("default").Get(pdb.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get PDB: %v", err)
}
if expected, actual := int32(0), finalPDB.Status.PodDisruptionsAllowed; expected != actual {
t.Errorf("PodDisruptionsAllowed should be %d, got %d", expected, actual)
}
}
// waitForCacheCount blocks until the given cache store has the desired number
// of items in it. This will return an error if the condition is not met after a
// 10 second timeout.
func waitForCacheCount(store cache.Store, n int) error {
return wait.Poll(10*time.Millisecond, 10*time.Second, func() (bool, error) {
return len(store.List()) == n, nil
})
}
// TestMain adds klog flags to make debugging tests easier.
func TestMain(m *testing.M) {
klog.InitFlags(flag.CommandLine)
os.Exit(m.Run())
}