Add clock interface to disruption controller

To be able to write more precise unit tests in the future

Change-Id: I8f45947dfacca501acd856849bd978fad0f735cd
This commit is contained in:
Aldo Culquicondor 2022-07-26 15:11:53 -04:00
parent 0d46dc1f46
commit dad8454ebb
4 changed files with 64 additions and 20 deletions

View File

@ -53,6 +53,7 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/utils/clock"
) )
// DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status // DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status
@ -65,7 +66,7 @@ import (
// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough // clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
// protection against unwanted pod disruptions. // protection against unwanted pod disruptions.
const ( const (
DeletionTimeout = 2 * 60 * time.Second DeletionTimeout = 2 * time.Minute
) )
type updater func(context.Context, *policy.PodDisruptionBudget) error type updater func(context.Context, *policy.PodDisruptionBudget) error
@ -103,6 +104,8 @@ type DisruptionController struct {
recorder record.EventRecorder recorder record.EventRecorder
getUpdater func() updater getUpdater func() updater
clock clock.Clock
} }
// controllerAndScale is used to return (controller, scale) pairs from the // controllerAndScale is used to return (controller, scale) pairs from the
@ -127,11 +130,41 @@ func NewDisruptionController(
restMapper apimeta.RESTMapper, restMapper apimeta.RESTMapper,
scaleNamespacer scaleclient.ScalesGetter, scaleNamespacer scaleclient.ScalesGetter,
discoveryClient discovery.DiscoveryInterface, discoveryClient discovery.DiscoveryInterface,
) *DisruptionController {
return NewDisruptionControllerInternal(
podInformer,
pdbInformer,
rcInformer,
rsInformer,
dInformer,
ssInformer,
kubeClient,
restMapper,
scaleNamespacer,
discoveryClient,
clock.RealClock{})
}
// NewDisruptionControllerInternal allows to set a clock and
// stalePodDisruptionTimeout
// It is only supposed to be used by tests.
func NewDisruptionControllerInternal(
podInformer coreinformers.PodInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
rcInformer coreinformers.ReplicationControllerInformer,
rsInformer appsv1informers.ReplicaSetInformer,
dInformer appsv1informers.DeploymentInformer,
ssInformer appsv1informers.StatefulSetInformer,
kubeClient clientset.Interface,
restMapper apimeta.RESTMapper,
scaleNamespacer scaleclient.ScalesGetter,
discoveryClient discovery.DiscoveryInterface,
clock clock.WithTicker,
) *DisruptionController { ) *DisruptionController {
dc := &DisruptionController{ dc := &DisruptionController{
kubeClient: kubeClient, kubeClient: kubeClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"), queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "disruption"), workqueue.DefaultControllerRateLimiter()),
recheckQueue: workqueue.NewNamedDelayingQueue("disruption_recheck"), recheckQueue: workqueue.NewDelayingQueueWithCustomClock(clock, "disruption_recheck"),
broadcaster: record.NewBroadcaster(), broadcaster: record.NewBroadcaster(),
} }
dc.recorder = dc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"}) dc.recorder = dc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"})
@ -172,6 +205,8 @@ func NewDisruptionController(
dc.scaleNamespacer = scaleNamespacer dc.scaleNamespacer = scaleNamespacer
dc.discoveryClient = discoveryClient dc.discoveryClient = discoveryClient
dc.clock = clock
return dc return dc
} }
@ -564,9 +599,9 @@ func (dc *DisruptionController) processNextRecheckWorkItem() bool {
} }
func (dc *DisruptionController) sync(ctx context.Context, key string) error { func (dc *DisruptionController) sync(ctx context.Context, key string) error {
startTime := time.Now() startTime := dc.clock.Now()
defer func() { defer func() {
klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Since(startTime)) klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, dc.clock.Since(startTime))
}() }()
namespace, name, err := cache.SplitMetaNamespaceKey(key) namespace, name, err := cache.SplitMetaNamespaceKey(key)
@ -617,7 +652,7 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr
strings.Join(unmanagedPods, ",'")) strings.Join(unmanagedPods, ",'"))
} }
currentTime := time.Now() currentTime := dc.clock.Now()
disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime) disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime)
currentHealthy := countHealthyPods(pods, disruptedPods, currentTime) currentHealthy := countHealthyPods(pods, disruptedPods, currentTime)
err = dc.updatePdbStatus(ctx, pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods) err = dc.updatePdbStatus(ctx, pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods)

View File

@ -27,11 +27,12 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
apps "k8s.io/api/apps/v1" apps "k8s.io/api/apps/v1"
autoscalingapi "k8s.io/api/autoscaling/v1" autoscalingapi "k8s.io/api/autoscaling/v1"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1" policy "k8s.io/api/policy/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta" apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/meta/testrestmapper" "k8s.io/apimachinery/pkg/api/meta/testrestmapper"
@ -52,6 +53,7 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
_ "k8s.io/kubernetes/pkg/apis/core/install" _ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
clocktesting "k8s.io/utils/clock/testing"
utilpointer "k8s.io/utils/pointer" utilpointer "k8s.io/utils/pointer"
) )
@ -72,8 +74,8 @@ func (ps *pdbStates) Get(key string) policy.PodDisruptionBudget {
return (*ps)[key] return (*ps)[key]
} }
func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowed, currentHealthy, desiredHealthy, expectedPods int32, func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowed, currentHealthy, desiredHealthy, expectedPods int32, disruptedPodMap map[string]metav1.Time) {
disruptedPodMap map[string]metav1.Time) { t.Helper()
actualPDB := ps.Get(key) actualPDB := ps.Get(key)
actualConditions := actualPDB.Status.Conditions actualConditions := actualPDB.Status.Conditions
actualPDB.Status.Conditions = nil actualPDB.Status.Conditions = nil
@ -86,9 +88,8 @@ func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowe
ObservedGeneration: actualPDB.Generation, ObservedGeneration: actualPDB.Generation,
} }
actualStatus := actualPDB.Status actualStatus := actualPDB.Status
if !apiequality.Semantic.DeepEqual(actualStatus, expectedStatus) { if diff := cmp.Diff(expectedStatus, actualStatus, cmpopts.EquateEmpty()); diff != "" {
debug.PrintStack() t.Fatalf("PDB %q status mismatch (-want,+got):\n%s", key, diff)
t.Fatalf("PDB %q status mismatch. Expected %+v but got %+v.", key, expectedStatus, actualStatus)
} }
cond := apimeta.FindStatusCondition(actualConditions, policy.DisruptionAllowedCondition) cond := apimeta.FindStatusCondition(actualConditions, policy.DisruptionAllowedCondition)
@ -158,8 +159,9 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
fakeDiscovery := &discoveryfake.FakeDiscovery{ fakeDiscovery := &discoveryfake.FakeDiscovery{
Fake: &core.Fake{}, Fake: &core.Fake{},
} }
fakeClock := clocktesting.NewFakeClock(time.Now())
dc := NewDisruptionController( dc := NewDisruptionControllerInternal(
informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Pods(),
informerFactory.Policy().V1().PodDisruptionBudgets(), informerFactory.Policy().V1().PodDisruptionBudgets(),
informerFactory.Core().V1().ReplicationControllers(), informerFactory.Core().V1().ReplicationControllers(),
@ -170,6 +172,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
testrestmapper.TestOnlyStaticRESTMapper(scheme), testrestmapper.TestOnlyStaticRESTMapper(scheme),
fakeScaleClient, fakeScaleClient,
fakeDiscovery, fakeDiscovery,
fakeClock,
) )
dc.getUpdater = func() updater { return ps.Set } dc.getUpdater = func() updater { return ps.Set }
dc.podListerSynced = alwaysReady dc.podListerSynced = alwaysReady
@ -990,17 +993,17 @@ func TestUpdateDisruptedPods(t *testing.T) {
dc, ps := newFakeDisruptionController() dc, ps := newFakeDisruptionController()
dc.recheckQueue = workqueue.NewNamedDelayingQueue("pdb_queue") dc.recheckQueue = workqueue.NewNamedDelayingQueue("pdb_queue")
pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt(1)) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt(1))
currentTime := time.Now() currentTime := dc.clock.Now()
pdb.Status.DisruptedPods = map[string]metav1.Time{ pdb.Status.DisruptedPods = map[string]metav1.Time{
"p1": {Time: currentTime}, // Should be removed, pod deletion started. "p1": {Time: currentTime}, // Should be removed, pod deletion started.
"p2": {Time: currentTime.Add(-5 * time.Minute)}, // Should be removed, expired. "p2": {Time: currentTime.Add(-3 * time.Minute)}, // Should be removed, expired.
"p3": {Time: currentTime}, // Should remain, pod untouched. "p3": {Time: currentTime.Add(-time.Minute)}, // Should remain, pod untouched.
"notthere": {Time: currentTime}, // Should be removed, pod deleted. "notthere": {Time: currentTime}, // Should be removed, pod deleted.
} }
add(t, dc.pdbStore, pdb) add(t, dc.pdbStore, pdb)
pod1, _ := newPod(t, "p1") pod1, _ := newPod(t, "p1")
pod1.DeletionTimestamp = &metav1.Time{Time: time.Now()} pod1.DeletionTimestamp = &metav1.Time{Time: dc.clock.Now()}
pod2, _ := newPod(t, "p2") pod2, _ := newPod(t, "p2")
pod3, _ := newPod(t, "p3") pod3, _ := newPod(t, "p3")
@ -1010,7 +1013,7 @@ func TestUpdateDisruptedPods(t *testing.T) {
dc.sync(context.TODO(), pdbName) dc.sync(context.TODO(), pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime}}) ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime.Add(-time.Minute)}})
} }
func TestBasicFinderFunctions(t *testing.T) { func TestBasicFinderFunctions(t *testing.T) {
@ -1284,7 +1287,7 @@ func TestUpdatePDBStatusRetries(t *testing.T) {
updatedPDB.Status.DisruptionsAllowed -= int32(len(podNames)) updatedPDB.Status.DisruptionsAllowed -= int32(len(podNames))
updatedPDB.Status.DisruptedPods = make(map[string]metav1.Time) updatedPDB.Status.DisruptedPods = make(map[string]metav1.Time)
for _, name := range podNames { for _, name := range podNames {
updatedPDB.Status.DisruptedPods[name] = metav1.NewTime(time.Now()) updatedPDB.Status.DisruptedPods[name] = metav1.NewTime(dc.clock.Now())
} }
if err := dc.coreClient.Tracker().Update(poddisruptionbudgetsResource, updatedPDB, updatedPDB.Namespace); err != nil { if err := dc.coreClient.Tracker().Update(poddisruptionbudgetsResource, updatedPDB, updatedPDB.Namespace); err != nil {
t.Fatalf("Eviction (PDB update) failed: %v", err) t.Fatalf("Eviction (PDB update) failed: %v", err)

View File

@ -50,6 +50,13 @@ func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitin
} }
} }
func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface {
return &rateLimitingType{
DelayingInterface: di,
rateLimiter: rateLimiter,
}
}
// rateLimitingType wraps an Interface and provides rateLimited re-enquing // rateLimitingType wraps an Interface and provides rateLimited re-enquing
type rateLimitingType struct { type rateLimitingType struct {
DelayingInterface DelayingInterface

View File

@ -24,7 +24,6 @@ import (
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1" policyv1 "k8s.io/api/policy/v1"
"k8s.io/api/policy/v1beta1" "k8s.io/api/policy/v1beta1"