kube-controller-manager: finish conversion to contextual logging

This removes all exceptions and fixes the remaining unconverted log calls.
This commit is contained in:
Patrick Ohly 2023-07-12 10:45:00 +02:00
parent 1b8ddf6b79
commit 7d064812bb
17 changed files with 177 additions and 161 deletions

View File

@ -41,4 +41,5 @@ rules:
- k8s.io/kubernetes/pkg/proxy/util - k8s.io/kubernetes/pkg/proxy/util
- k8s.io/kubernetes/pkg/proxy/util/testing - k8s.io/kubernetes/pkg/proxy/util/testing
- k8s.io/kubernetes/pkg/util/slice - k8s.io/kubernetes/pkg/util/slice
- k8s.io/kubernetes/pkg/util/sysctl - k8s.io/kubernetes/pkg/util/sysctl
- k8s.io/kubernetes/test/utils/ktesting

View File

@ -43,6 +43,7 @@ func startEndpointSliceController(ctx context.Context, controllerContext Control
func startEndpointSliceMirroringController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { func startEndpointSliceMirroringController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go endpointslicemirroringcontroller.NewController( go endpointslicemirroringcontroller.NewController(
ctx,
controllerContext.InformerFactory.Core().V1().Endpoints(), controllerContext.InformerFactory.Core().V1().Endpoints(),
controllerContext.InformerFactory.Discovery().V1().EndpointSlices(), controllerContext.InformerFactory.Discovery().V1().EndpointSlices(),
controllerContext.InformerFactory.Core().V1().Services(), controllerContext.InformerFactory.Core().V1().Services(),

View File

@ -34,19 +34,6 @@ contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
contextual k8s.io/kubernetes/pkg/controller/.* contextual k8s.io/kubernetes/pkg/controller/.*
contextual k8s.io/kubernetes/test/e2e/dra/.* contextual k8s.io/kubernetes/test/e2e/dra/.*
# Most of kube-controller-manager has been converted, but not everything. At
# this point it is easier to list the exceptions.
-contextual k8s.io/kubernetes/pkg/controller/controller_ref_manager.go
-contextual k8s.io/kubernetes/pkg/controller/controller_utils.go
-contextual k8s.io/kubernetes/pkg/controller/endpoint/.*
-contextual k8s.io/kubernetes/pkg/controller/endpointslicemirroring/.*
-contextual k8s.io/kubernetes/pkg/controller/garbagecollector/.*
-contextual k8s.io/kubernetes/pkg/controller/nodeipam/.*
-contextual k8s.io/kubernetes/pkg/controller/podgc/.*
-contextual k8s.io/kubernetes/pkg/controller/replicaset/.*
-contextual k8s.io/kubernetes/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go
-contextual k8s.io/kubernetes/pkg/controller/volume/pvprotection/pv_protection_controller_test.go
# As long as contextual logging is alpha or beta, all WithName, WithValues, # As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift # NewContext calls have to go through klog. Once it is GA, we can lift
# this restriction. Whether we then do a global search/replace remains # this restriction. Whether we then do a global search/replace remains

View File

@ -236,8 +236,8 @@ func (m *PodControllerRefManager) AdoptPod(ctx context.Context, pod *v1.Pod) err
// ReleasePod sends a patch to free the pod from the control of the controller. // ReleasePod sends a patch to free the pod from the control of the controller.
// It returns the error if the patching fails. 404 and 422 errors are ignored. // It returns the error if the patching fails. 404 and 422 errors are ignored.
func (m *PodControllerRefManager) ReleasePod(ctx context.Context, pod *v1.Pod) error { func (m *PodControllerRefManager) ReleasePod(ctx context.Context, pod *v1.Pod) error {
klog.V(2).Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s", logger := klog.FromContext(ctx)
pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) logger.V(2).Info("Patching pod to remove its controllerRef", "pod", klog.KObj(pod), "gvk", m.controllerKind, "controller", m.Controller.GetName())
patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(pod.UID, []types.UID{m.Controller.GetUID()}, m.finalizers...) patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(pod.UID, []types.UID{m.Controller.GetUID()}, m.finalizers...)
if err != nil { if err != nil {
return err return err
@ -361,8 +361,8 @@ func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(ctx context.Context, rs
// ReleaseReplicaSet sends a patch to free the ReplicaSet from the control of the Deployment controller. // ReleaseReplicaSet sends a patch to free the ReplicaSet from the control of the Deployment controller.
// It returns the error if the patching fails. 404 and 422 errors are ignored. // It returns the error if the patching fails. 404 and 422 errors are ignored.
func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(ctx context.Context, replicaSet *apps.ReplicaSet) error { func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(ctx context.Context, replicaSet *apps.ReplicaSet) error {
klog.V(2).Infof("patching ReplicaSet %s_%s to remove its controllerRef to %s/%s:%s", logger := klog.FromContext(ctx)
replicaSet.Namespace, replicaSet.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) logger.V(2).Info("Patching ReplicaSet to remove its controllerRef", "replicaSet", klog.KObj(replicaSet), "gvk", m.controllerKind, "controller", m.Controller.GetName())
patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(replicaSet.UID, []types.UID{m.Controller.GetUID()}) patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(replicaSet.UID, []types.UID{m.Controller.GetUID()})
if err != nil { if err != nil {
return err return err
@ -499,8 +499,8 @@ func (m *ControllerRevisionControllerRefManager) AdoptControllerRevision(ctx con
// ReleaseControllerRevision sends a patch to free the ControllerRevision from the control of its controller. // ReleaseControllerRevision sends a patch to free the ControllerRevision from the control of its controller.
// It returns the error if the patching fails. 404 and 422 errors are ignored. // It returns the error if the patching fails. 404 and 422 errors are ignored.
func (m *ControllerRevisionControllerRefManager) ReleaseControllerRevision(ctx context.Context, history *apps.ControllerRevision) error { func (m *ControllerRevisionControllerRefManager) ReleaseControllerRevision(ctx context.Context, history *apps.ControllerRevision) error {
klog.V(2).Infof("patching ControllerRevision %s_%s to remove its controllerRef to %s/%s:%s", logger := klog.FromContext(ctx)
history.Namespace, history.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) logger.V(2).Info("Patching ControllerRevision to remove its controllerRef", "controllerRevision", klog.KObj(history), "gvk", m.controllerKind, "controller", m.Controller.GetName())
patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(history.UID, []types.UID{m.Controller.GetUID()}) patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(history.UID, []types.UID{m.Controller.GetUID()})
if err != nil { if err != nil {
return err return err

View File

@ -146,15 +146,15 @@ var ExpKeyFunc = func(obj interface{}) (string, error) {
// types of controllers, because the keys might conflict across types. // types of controllers, because the keys might conflict across types.
type ControllerExpectationsInterface interface { type ControllerExpectationsInterface interface {
GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error)
SatisfiedExpectations(controllerKey string) bool SatisfiedExpectations(logger klog.Logger, controllerKey string) bool
DeleteExpectations(controllerKey string) DeleteExpectations(logger klog.Logger, controllerKey string)
SetExpectations(controllerKey string, add, del int) error SetExpectations(logger klog.Logger, controllerKey string, add, del int) error
ExpectCreations(controllerKey string, adds int) error ExpectCreations(logger klog.Logger, controllerKey string, adds int) error
ExpectDeletions(controllerKey string, dels int) error ExpectDeletions(logger klog.Logger, controllerKey string, dels int) error
CreationObserved(controllerKey string) CreationObserved(logger klog.Logger, controllerKey string)
DeletionObserved(controllerKey string) DeletionObserved(logger klog.Logger, controllerKey string)
RaiseExpectations(controllerKey string, add, del int) RaiseExpectations(logger klog.Logger, controllerKey string, add, del int)
LowerExpectations(controllerKey string, add, del int) LowerExpectations(logger klog.Logger, controllerKey string, add, del int)
} }
// ControllerExpectations is a cache mapping controllers to what they expect to see before being woken up for a sync. // ControllerExpectations is a cache mapping controllers to what they expect to see before being woken up for a sync.
@ -172,10 +172,11 @@ func (r *ControllerExpectations) GetExpectations(controllerKey string) (*Control
} }
// DeleteExpectations deletes the expectations of the given controller from the TTLStore. // DeleteExpectations deletes the expectations of the given controller from the TTLStore.
func (r *ControllerExpectations) DeleteExpectations(controllerKey string) { func (r *ControllerExpectations) DeleteExpectations(logger klog.Logger, controllerKey string) {
if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists { if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists {
if err := r.Delete(exp); err != nil { if err := r.Delete(exp); err != nil {
klog.V(2).Infof("Error deleting expectations for controller %v: %v", controllerKey, err)
logger.V(2).Info("Error deleting expectations", "controller", controllerKey, "err", err)
} }
} }
} }
@ -183,27 +184,27 @@ func (r *ControllerExpectations) DeleteExpectations(controllerKey string) {
// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed. // SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed.
// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller // Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller
// manager. // manager.
func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool { func (r *ControllerExpectations) SatisfiedExpectations(logger klog.Logger, controllerKey string) bool {
if exp, exists, err := r.GetExpectations(controllerKey); exists { if exp, exists, err := r.GetExpectations(controllerKey); exists {
if exp.Fulfilled() { if exp.Fulfilled() {
klog.V(4).InfoS("Controller expectations fulfilled", "expectations", exp) logger.V(4).Info("Controller expectations fulfilled", "expectations", exp)
return true return true
} else if exp.isExpired() { } else if exp.isExpired() {
klog.V(4).InfoS("Controller expectations expired", "expectations", exp) logger.V(4).Info("Controller expectations expired", "expectations", exp)
return true return true
} else { } else {
klog.V(4).InfoS("Controller still waiting on expectations", "expectations", exp) logger.V(4).Info("Controller still waiting on expectations", "expectations", exp)
return false return false
} }
} else if err != nil { } else if err != nil {
klog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err) logger.V(2).Info("Error encountered while checking expectations, forcing sync", "err", err)
} else { } else {
// When a new controller is created, it doesn't have expectations. // When a new controller is created, it doesn't have expectations.
// When it doesn't see expected watch events for > TTL, the expectations expire. // When it doesn't see expected watch events for > TTL, the expectations expire.
// - In this case it wakes up, creates/deletes controllees, and sets expectations again. // - In this case it wakes up, creates/deletes controllees, and sets expectations again.
// When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire. // When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire.
// - In this case it continues without setting expectations till it needs to create/delete controllees. // - In this case it continues without setting expectations till it needs to create/delete controllees.
klog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey) logger.V(4).Info("Controller either never recorded expectations, or the ttl expired", "controller", controllerKey)
} }
// Trigger a sync if we either encountered and error (which shouldn't happen since we're // Trigger a sync if we either encountered and error (which shouldn't happen since we're
// getting from local store) or this controller hasn't established expectations. // getting from local store) or this controller hasn't established expectations.
@ -218,46 +219,46 @@ func (exp *ControlleeExpectations) isExpired() bool {
} }
// SetExpectations registers new expectations for the given controller. Forgets existing expectations. // SetExpectations registers new expectations for the given controller. Forgets existing expectations.
func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error { func (r *ControllerExpectations) SetExpectations(logger klog.Logger, controllerKey string, add, del int) error {
exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()} exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()}
klog.V(4).InfoS("Setting expectations", "expectations", exp) logger.V(4).Info("Setting expectations", "expectations", exp)
return r.Add(exp) return r.Add(exp)
} }
func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error { func (r *ControllerExpectations) ExpectCreations(logger klog.Logger, controllerKey string, adds int) error {
return r.SetExpectations(controllerKey, adds, 0) return r.SetExpectations(logger, controllerKey, adds, 0)
} }
func (r *ControllerExpectations) ExpectDeletions(controllerKey string, dels int) error { func (r *ControllerExpectations) ExpectDeletions(logger klog.Logger, controllerKey string, dels int) error {
return r.SetExpectations(controllerKey, 0, dels) return r.SetExpectations(logger, controllerKey, 0, dels)
} }
// Decrements the expectation counts of the given controller. // Decrements the expectation counts of the given controller.
func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) { func (r *ControllerExpectations) LowerExpectations(logger klog.Logger, controllerKey string, add, del int) {
if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
exp.Add(int64(-add), int64(-del)) exp.Add(int64(-add), int64(-del))
// The expectations might've been modified since the update on the previous line. // The expectations might've been modified since the update on the previous line.
klog.V(4).InfoS("Lowered expectations", "expectations", exp) logger.V(4).Info("Lowered expectations", "expectations", exp)
} }
} }
// Increments the expectation counts of the given controller. // Increments the expectation counts of the given controller.
func (r *ControllerExpectations) RaiseExpectations(controllerKey string, add, del int) { func (r *ControllerExpectations) RaiseExpectations(logger klog.Logger, controllerKey string, add, del int) {
if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
exp.Add(int64(add), int64(del)) exp.Add(int64(add), int64(del))
// The expectations might've been modified since the update on the previous line. // The expectations might've been modified since the update on the previous line.
klog.V(4).Infof("Raised expectations", "expectations", exp) logger.V(4).Info("Raised expectations", "expectations", exp)
} }
} }
// CreationObserved atomically decrements the `add` expectation count of the given controller. // CreationObserved atomically decrements the `add` expectation count of the given controller.
func (r *ControllerExpectations) CreationObserved(controllerKey string) { func (r *ControllerExpectations) CreationObserved(logger klog.Logger, controllerKey string) {
r.LowerExpectations(controllerKey, 1, 0) r.LowerExpectations(logger, controllerKey, 1, 0)
} }
// DeletionObserved atomically decrements the `del` expectation count of the given controller. // DeletionObserved atomically decrements the `del` expectation count of the given controller.
func (r *ControllerExpectations) DeletionObserved(controllerKey string) { func (r *ControllerExpectations) DeletionObserved(logger klog.Logger, controllerKey string) {
r.LowerExpectations(controllerKey, 0, 1) r.LowerExpectations(logger, controllerKey, 0, 1)
} }
// ControlleeExpectations track controllee creates/deletes. // ControlleeExpectations track controllee creates/deletes.
@ -349,47 +350,47 @@ func (u *UIDTrackingControllerExpectations) GetUIDs(controllerKey string) sets.S
} }
// ExpectDeletions records expectations for the given deleteKeys, against the given controller. // ExpectDeletions records expectations for the given deleteKeys, against the given controller.
func (u *UIDTrackingControllerExpectations) ExpectDeletions(rcKey string, deletedKeys []string) error { func (u *UIDTrackingControllerExpectations) ExpectDeletions(logger klog.Logger, rcKey string, deletedKeys []string) error {
expectedUIDs := sets.NewString() expectedUIDs := sets.NewString()
for _, k := range deletedKeys { for _, k := range deletedKeys {
expectedUIDs.Insert(k) expectedUIDs.Insert(k)
} }
klog.V(4).Infof("Controller %v waiting on deletions for: %+v", rcKey, deletedKeys) logger.V(4).Info("Controller waiting on deletions", "controller", rcKey, "keys", deletedKeys)
u.uidStoreLock.Lock() u.uidStoreLock.Lock()
defer u.uidStoreLock.Unlock() defer u.uidStoreLock.Unlock()
if existing := u.GetUIDs(rcKey); existing != nil && existing.Len() != 0 { if existing := u.GetUIDs(rcKey); existing != nil && existing.Len() != 0 {
klog.Errorf("Clobbering existing delete keys: %+v", existing) logger.Error(nil, "Clobbering existing delete keys", "keys", existing)
} }
if err := u.uidStore.Add(&UIDSet{expectedUIDs, rcKey}); err != nil { if err := u.uidStore.Add(&UIDSet{expectedUIDs, rcKey}); err != nil {
return err return err
} }
return u.ControllerExpectationsInterface.ExpectDeletions(rcKey, expectedUIDs.Len()) return u.ControllerExpectationsInterface.ExpectDeletions(logger, rcKey, expectedUIDs.Len())
} }
// DeletionObserved records the given deleteKey as a deletion, for the given rc. // DeletionObserved records the given deleteKey as a deletion, for the given rc.
func (u *UIDTrackingControllerExpectations) DeletionObserved(rcKey, deleteKey string) { func (u *UIDTrackingControllerExpectations) DeletionObserved(logger klog.Logger, rcKey, deleteKey string) {
u.uidStoreLock.Lock() u.uidStoreLock.Lock()
defer u.uidStoreLock.Unlock() defer u.uidStoreLock.Unlock()
uids := u.GetUIDs(rcKey) uids := u.GetUIDs(rcKey)
if uids != nil && uids.Has(deleteKey) { if uids != nil && uids.Has(deleteKey) {
klog.V(4).Infof("Controller %v received delete for pod %v", rcKey, deleteKey) logger.V(4).Info("Controller received delete for pod", "controller", rcKey, "key", deleteKey)
u.ControllerExpectationsInterface.DeletionObserved(rcKey) u.ControllerExpectationsInterface.DeletionObserved(logger, rcKey)
uids.Delete(deleteKey) uids.Delete(deleteKey)
} }
} }
// DeleteExpectations deletes the UID set and invokes DeleteExpectations on the // DeleteExpectations deletes the UID set and invokes DeleteExpectations on the
// underlying ControllerExpectationsInterface. // underlying ControllerExpectationsInterface.
func (u *UIDTrackingControllerExpectations) DeleteExpectations(rcKey string) { func (u *UIDTrackingControllerExpectations) DeleteExpectations(logger klog.Logger, rcKey string) {
u.uidStoreLock.Lock() u.uidStoreLock.Lock()
defer u.uidStoreLock.Unlock() defer u.uidStoreLock.Unlock()
u.ControllerExpectationsInterface.DeleteExpectations(rcKey) u.ControllerExpectationsInterface.DeleteExpectations(logger, rcKey)
if uidExp, exists, err := u.uidStore.GetByKey(rcKey); err == nil && exists { if uidExp, exists, err := u.uidStore.GetByKey(rcKey); err == nil && exists {
if err := u.uidStore.Delete(uidExp); err != nil { if err := u.uidStore.Delete(uidExp); err != nil {
klog.V(2).Infof("Error deleting uid expectations for controller %v: %v", rcKey, err) logger.V(2).Info("Error deleting uid expectations", "controller", rcKey, "err", err)
} }
} }
} }
@ -587,12 +588,13 @@ func (r RealPodControl) createPods(ctx context.Context, namespace string, pod *v
} }
return err return err
} }
logger := klog.FromContext(ctx)
accessor, err := meta.Accessor(object) accessor, err := meta.Accessor(object)
if err != nil { if err != nil {
klog.Errorf("parentObject does not have ObjectMeta, %v", err) logger.Error(err, "parentObject does not have ObjectMeta")
return nil return nil
} }
klog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), newPod.Name) logger.V(4).Info("Controller created pod", "controller", accessor.GetName(), "pod", klog.KObj(newPod))
r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulCreatePodReason, "Created pod: %v", newPod.Name) r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulCreatePodReason, "Created pod: %v", newPod.Name)
return nil return nil
@ -603,10 +605,11 @@ func (r RealPodControl) DeletePod(ctx context.Context, namespace string, podID s
if err != nil { if err != nil {
return fmt.Errorf("object does not have ObjectMeta, %v", err) return fmt.Errorf("object does not have ObjectMeta, %v", err)
} }
klog.V(2).InfoS("Deleting pod", "controller", accessor.GetName(), "pod", klog.KRef(namespace, podID)) logger := klog.FromContext(ctx)
logger.V(2).Info("Deleting pod", "controller", accessor.GetName(), "pod", klog.KRef(namespace, podID))
if err := r.KubeClient.CoreV1().Pods(namespace).Delete(ctx, podID, metav1.DeleteOptions{}); err != nil { if err := r.KubeClient.CoreV1().Pods(namespace).Delete(ctx, podID, metav1.DeleteOptions{}); err != nil {
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
klog.V(4).Infof("pod %v/%v has already been deleted.", namespace, podID) logger.V(4).Info("Pod has already been deleted.", "pod", klog.KRef(namespace, podID))
return err return err
} }
r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err) r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err)
@ -943,14 +946,13 @@ func maxContainerRestarts(pod *v1.Pod) int {
} }
// FilterActivePods returns pods that have not terminated. // FilterActivePods returns pods that have not terminated.
func FilterActivePods(pods []*v1.Pod) []*v1.Pod { func FilterActivePods(logger klog.Logger, pods []*v1.Pod) []*v1.Pod {
var result []*v1.Pod var result []*v1.Pod
for _, p := range pods { for _, p := range pods {
if IsPodActive(p) { if IsPodActive(p) {
result = append(result, p) result = append(result, p)
} else { } else {
klog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v", logger.V(4).Info("Ignoring inactive pod", "pod", klog.KObj(p), "phase", p.Status.Phase, "deletionTime", p.DeletionTimestamp)
p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp)
} }
} }
return result return result

View File

@ -51,6 +51,7 @@ import (
"k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/kubernetes/pkg/controller/testutil"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/test/utils/ktesting"
testingclock "k8s.io/utils/clock/testing" testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
@ -170,6 +171,7 @@ func newReplicaSet(name string, replicas int) *apps.ReplicaSet {
} }
func TestControllerExpectations(t *testing.T) { func TestControllerExpectations(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
ttl := 30 * time.Second ttl := 30 * time.Second
e, fakeClock := NewFakeControllerExpectationsLookup(ttl) e, fakeClock := NewFakeControllerExpectationsLookup(ttl)
// In practice we can't really have add and delete expectations since we only either create or // In practice we can't really have add and delete expectations since we only either create or
@ -182,26 +184,26 @@ func TestControllerExpectations(t *testing.T) {
rcKey, err := KeyFunc(rc) rcKey, err := KeyFunc(rc)
assert.NoError(t, err, "Couldn't get key for object %#v: %v", rc, err) assert.NoError(t, err, "Couldn't get key for object %#v: %v", rc, err)
e.SetExpectations(rcKey, adds, dels) e.SetExpectations(logger, rcKey, adds, dels)
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < adds+1; i++ { for i := 0; i < adds+1; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
// In prod this can happen either because of a failed create by the rc // In prod this can happen either because of a failed create by the rc
// or after having observed a create via informer // or after having observed a create via informer
e.CreationObserved(rcKey) e.CreationObserved(logger, rcKey)
wg.Done() wg.Done()
}() }()
} }
wg.Wait() wg.Wait()
// There are still delete expectations // There are still delete expectations
assert.False(t, e.SatisfiedExpectations(rcKey), "Rc will sync before expectations are met") assert.False(t, e.SatisfiedExpectations(logger, rcKey), "Rc will sync before expectations are met")
for i := 0; i < dels+1; i++ { for i := 0; i < dels+1; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
e.DeletionObserved(rcKey) e.DeletionObserved(logger, rcKey)
wg.Done() wg.Done()
}() }()
} }
@ -215,10 +217,10 @@ func TestControllerExpectations(t *testing.T) {
add, del := podExp.GetExpectations() add, del := podExp.GetExpectations()
assert.Equal(t, int64(-1), add, "Unexpected pod expectations %#v", podExp) assert.Equal(t, int64(-1), add, "Unexpected pod expectations %#v", podExp)
assert.Equal(t, int64(-1), del, "Unexpected pod expectations %#v", podExp) assert.Equal(t, int64(-1), del, "Unexpected pod expectations %#v", podExp)
assert.True(t, e.SatisfiedExpectations(rcKey), "Expectations are met but the rc will not sync") assert.True(t, e.SatisfiedExpectations(logger, rcKey), "Expectations are met but the rc will not sync")
// Next round of rc sync, old expectations are cleared // Next round of rc sync, old expectations are cleared
e.SetExpectations(rcKey, 1, 2) e.SetExpectations(logger, rcKey, 1, 2)
podExp, exists, err = e.GetExpectations(rcKey) podExp, exists, err = e.GetExpectations(rcKey)
assert.NoError(t, err, "Could not get expectations for rc, exists %v and err %v", exists, err) assert.NoError(t, err, "Could not get expectations for rc, exists %v and err %v", exists, err)
assert.True(t, exists, "Could not get expectations for rc, exists %v and err %v", exists, err) assert.True(t, exists, "Could not get expectations for rc, exists %v and err %v", exists, err)
@ -229,11 +231,12 @@ func TestControllerExpectations(t *testing.T) {
// Expectations have expired because of ttl // Expectations have expired because of ttl
fakeClock.Step(ttl + 1) fakeClock.Step(ttl + 1)
assert.True(t, e.SatisfiedExpectations(rcKey), assert.True(t, e.SatisfiedExpectations(logger, rcKey),
"Expectations should have expired but didn't") "Expectations should have expired but didn't")
} }
func TestUIDExpectations(t *testing.T) { func TestUIDExpectations(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
uidExp := NewUIDTrackingControllerExpectations(NewControllerExpectations()) uidExp := NewUIDTrackingControllerExpectations(NewControllerExpectations())
rcList := []*v1.ReplicationController{ rcList := []*v1.ReplicationController{
newReplicationController(2), newReplicationController(2),
@ -261,24 +264,24 @@ func TestUIDExpectations(t *testing.T) {
rcPodNames = append(rcPodNames, PodKey(p)) rcPodNames = append(rcPodNames, PodKey(p))
} }
rcToPods[rcKey] = rcPodNames rcToPods[rcKey] = rcPodNames
uidExp.ExpectDeletions(rcKey, rcPodNames) uidExp.ExpectDeletions(logger, rcKey, rcPodNames)
} }
for i := range rcKeys { for i := range rcKeys {
j := rand.Intn(i + 1) j := rand.Intn(i + 1)
rcKeys[i], rcKeys[j] = rcKeys[j], rcKeys[i] rcKeys[i], rcKeys[j] = rcKeys[j], rcKeys[i]
} }
for _, rcKey := range rcKeys { for _, rcKey := range rcKeys {
assert.False(t, uidExp.SatisfiedExpectations(rcKey), assert.False(t, uidExp.SatisfiedExpectations(logger, rcKey),
"Controller %v satisfied expectations before deletion", rcKey) "Controller %v satisfied expectations before deletion", rcKey)
for _, p := range rcToPods[rcKey] { for _, p := range rcToPods[rcKey] {
uidExp.DeletionObserved(rcKey, p) uidExp.DeletionObserved(logger, rcKey, p)
} }
assert.True(t, uidExp.SatisfiedExpectations(rcKey), assert.True(t, uidExp.SatisfiedExpectations(logger, rcKey),
"Controller %v didn't satisfy expectations after deletion", rcKey) "Controller %v didn't satisfy expectations after deletion", rcKey)
uidExp.DeleteExpectations(rcKey) uidExp.DeleteExpectations(logger, rcKey)
assert.Nil(t, uidExp.GetUIDs(rcKey), assert.Nil(t, uidExp.GetUIDs(rcKey),
"Failed to delete uid expectations for %v", rcKey) "Failed to delete uid expectations for %v", rcKey)
@ -378,6 +381,7 @@ func TestDeletePodsAllowsMissing(t *testing.T) {
} }
func TestActivePodFiltering(t *testing.T) { func TestActivePodFiltering(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
// This rc is not needed by the test, only the newPodList to give the pods labels/a namespace. // This rc is not needed by the test, only the newPodList to give the pods labels/a namespace.
rc := newReplicationController(0) rc := newReplicationController(0)
podList := newPodList(nil, 5, v1.PodRunning, rc) podList := newPodList(nil, 5, v1.PodRunning, rc)
@ -392,7 +396,7 @@ func TestActivePodFiltering(t *testing.T) {
for i := range podList.Items { for i := range podList.Items {
podPointers = append(podPointers, &podList.Items[i]) podPointers = append(podPointers, &podList.Items[i])
} }
got := FilterActivePods(podPointers) got := FilterActivePods(logger, podPointers)
gotNames := sets.NewString() gotNames := sets.NewString()
for _, pod := range got { for _, pod := range got {
gotNames.Insert(pod.Name) gotNames.Insert(pod.Name)

View File

@ -272,7 +272,7 @@ func (dsc *DaemonSetsController) deleteDaemonset(logger klog.Logger, obj interfa
} }
// Delete expectations for the DaemonSet so if we create a new one with the same name it starts clean // Delete expectations for the DaemonSet so if we create a new one with the same name it starts clean
dsc.expectations.DeleteExpectations(key) dsc.expectations.DeleteExpectations(logger, key)
dsc.queue.Add(key) dsc.queue.Add(key)
} }
@ -518,7 +518,7 @@ func (dsc *DaemonSetsController) addPod(logger klog.Logger, obj interface{}) {
return return
} }
logger.V(4).Info("Pod added", "pod", klog.KObj(pod)) logger.V(4).Info("Pod added", "pod", klog.KObj(pod))
dsc.expectations.CreationObserved(dsKey) dsc.expectations.CreationObserved(logger, dsKey)
dsc.enqueueDaemonSet(ds) dsc.enqueueDaemonSet(ds)
return return
} }
@ -635,7 +635,7 @@ func (dsc *DaemonSetsController) deletePod(logger klog.Logger, obj interface{})
return return
} }
logger.V(4).Info("Pod deleted", "pod", klog.KObj(pod)) logger.V(4).Info("Pod deleted", "pod", klog.KObj(pod))
dsc.expectations.DeletionObserved(dsKey) dsc.expectations.DeletionObserved(logger, dsKey)
dsc.enqueueDaemonSet(ds) dsc.enqueueDaemonSet(ds)
} }
@ -934,7 +934,7 @@ func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.D
} }
// Process rolling updates if we're ready. // Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(key) { if dsc.expectations.SatisfiedExpectations(klog.FromContext(ctx), key) {
switch ds.Spec.UpdateStrategy.Type { switch ds.Spec.UpdateStrategy.Type {
case apps.OnDeleteDaemonSetStrategyType: case apps.OnDeleteDaemonSetStrategyType:
case apps.RollingUpdateDaemonSetStrategyType: case apps.RollingUpdateDaemonSetStrategyType:
@ -1008,7 +1008,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS
deleteDiff = dsc.burstReplicas deleteDiff = dsc.burstReplicas
} }
dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff) dsc.expectations.SetExpectations(logger, dsKey, createDiff, deleteDiff)
// error channel to communicate back failures. make the buffer big enough to avoid any blocking // error channel to communicate back failures. make the buffer big enough to avoid any blocking
errCh := make(chan error, createDiff+deleteDiff) errCh := make(chan error, createDiff+deleteDiff)
@ -1057,7 +1057,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS
} }
if err != nil { if err != nil {
logger.V(2).Info("Failed creation, decrementing expectations for daemon set", "daemonset", klog.KObj(ds)) logger.V(2).Info("Failed creation, decrementing expectations for daemon set", "daemonset", klog.KObj(ds))
dsc.expectations.CreationObserved(dsKey) dsc.expectations.CreationObserved(logger, dsKey)
errCh <- err errCh <- err
utilruntime.HandleError(err) utilruntime.HandleError(err)
} }
@ -1068,7 +1068,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS
skippedPods := createDiff - (batchSize + pos) skippedPods := createDiff - (batchSize + pos)
if errorCount < len(errCh) && skippedPods > 0 { if errorCount < len(errCh) && skippedPods > 0 {
logger.V(2).Info("Slow-start failure. Skipping creation pods, decrementing expectations for daemon set", "skippedPods", skippedPods, "daemonset", klog.KObj(ds)) logger.V(2).Info("Slow-start failure. Skipping creation pods, decrementing expectations for daemon set", "skippedPods", skippedPods, "daemonset", klog.KObj(ds))
dsc.expectations.LowerExpectations(dsKey, skippedPods, 0) dsc.expectations.LowerExpectations(logger, dsKey, skippedPods, 0)
// The skipped pods will be retried later. The next controller resync will // The skipped pods will be retried later. The next controller resync will
// retry the slow start process. // retry the slow start process.
break break
@ -1082,7 +1082,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS
go func(ix int) { go func(ix int) {
defer deleteWait.Done() defer deleteWait.Done()
if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil { if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil {
dsc.expectations.DeletionObserved(dsKey) dsc.expectations.DeletionObserved(logger, dsKey)
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
logger.V(2).Info("Failed deletion, decremented expectations for daemon set", "daemonset", klog.KObj(ds)) logger.V(2).Info("Failed deletion, decremented expectations for daemon set", "daemonset", klog.KObj(ds))
errCh <- err errCh <- err
@ -1232,7 +1232,7 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string)
ds, err := dsc.dsLister.DaemonSets(namespace).Get(name) ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
logger.V(3).Info("Daemon set has been deleted", "daemonset", key) logger.V(3).Info("Daemon set has been deleted", "daemonset", key)
dsc.expectations.DeleteExpectations(key) dsc.expectations.DeleteExpectations(logger, key)
return nil return nil
} }
if err != nil { if err != nil {
@ -1277,7 +1277,7 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string)
} }
hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey] hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
if !dsc.expectations.SatisfiedExpectations(dsKey) { if !dsc.expectations.SatisfiedExpectations(logger, dsKey) {
// Only update status. Don't raise observedGeneration since controller didn't process object of that generation. // Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false) return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false)
} }

View File

@ -44,6 +44,7 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting" "k8s.io/klog/v2/ktesting"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
@ -275,7 +276,7 @@ func (f *fakePodControl) CreatePods(ctx context.Context, namespace string, templ
ds := object.(*apps.DaemonSet) ds := object.(*apps.DaemonSet)
dsKey, _ := controller.KeyFunc(ds) dsKey, _ := controller.KeyFunc(ds)
f.expectations.CreationObserved(dsKey) f.expectations.CreationObserved(klog.FromContext(ctx), dsKey)
return nil return nil
} }
@ -295,7 +296,7 @@ func (f *fakePodControl) DeletePod(ctx context.Context, namespace string, podID
ds := object.(*apps.DaemonSet) ds := object.(*apps.DaemonSet)
dsKey, _ := controller.KeyFunc(ds) dsKey, _ := controller.KeyFunc(ds)
f.expectations.DeletionObserved(dsKey) f.expectations.DeletionObserved(klog.FromContext(ctx), dsKey)
return nil return nil
} }
@ -424,7 +425,7 @@ func clearExpectations(t *testing.T, manager *daemonSetsController, ds *apps.Dae
t.Errorf("Could not get key for daemon.") t.Errorf("Could not get key for daemon.")
return return
} }
manager.expectations.DeleteExpectations(key) manager.expectations.DeleteExpectations(logger, key)
now := manager.failedPodsBackoff.Clock.Now() now := manager.failedPodsBackoff.Clock.Now()
hash, _ := currentDSHash(manager, ds) hash, _ := currentDSHash(manager, ds)
@ -836,6 +837,7 @@ func TestSimpleDaemonSetPodCreateErrors(t *testing.T) {
} }
func TestDaemonSetPodCreateExpectationsError(t *testing.T) { func TestDaemonSetPodCreateExpectationsError(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
strategies := updateStrategies() strategies := updateStrategies()
for _, strategy := range strategies { for _, strategy := range strategies {
ds := newDaemonSet("foo") ds := newDaemonSet("foo")
@ -860,7 +862,7 @@ func TestDaemonSetPodCreateExpectationsError(t *testing.T) {
t.Fatalf("error get DaemonSets controller key: %v", err) t.Fatalf("error get DaemonSets controller key: %v", err)
} }
if !manager.expectations.SatisfiedExpectations(dsKey) { if !manager.expectations.SatisfiedExpectations(logger, dsKey) {
t.Errorf("Unsatisfied pod creation expectations. Expected %d", creationExpectations) t.Errorf("Unsatisfied pod creation expectations. Expected %d", creationExpectations)
} }
} }

View File

@ -67,13 +67,14 @@ const (
) )
// NewController creates and initializes a new Controller // NewController creates and initializes a new Controller
func NewController(endpointsInformer coreinformers.EndpointsInformer, func NewController(ctx context.Context, endpointsInformer coreinformers.EndpointsInformer,
endpointSliceInformer discoveryinformers.EndpointSliceInformer, endpointSliceInformer discoveryinformers.EndpointSliceInformer,
serviceInformer coreinformers.ServiceInformer, serviceInformer coreinformers.ServiceInformer,
maxEndpointsPerSubset int32, maxEndpointsPerSubset int32,
client clientset.Interface, client clientset.Interface,
endpointUpdatesBatchPeriod time.Duration, endpointUpdatesBatchPeriod time.Duration,
) *Controller { ) *Controller {
logger := klog.FromContext(ctx)
broadcaster := record.NewBroadcaster() broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-mirroring-controller"}) recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-mirroring-controller"})
@ -96,16 +97,24 @@ func NewController(endpointsInformer coreinformers.EndpointsInformer,
} }
endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.onEndpointsAdd, AddFunc: func(obj interface{}) {
UpdateFunc: c.onEndpointsUpdate, c.onEndpointsAdd(logger, obj)
DeleteFunc: c.onEndpointsDelete, },
UpdateFunc: func(oldObj, newObj interface{}) {
c.onEndpointsUpdate(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
c.onEndpointsDelete(logger, obj)
},
}) })
c.endpointsLister = endpointsInformer.Lister() c.endpointsLister = endpointsInformer.Lister()
c.endpointsSynced = endpointsInformer.Informer().HasSynced c.endpointsSynced = endpointsInformer.Informer().HasSynced
endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.onEndpointSliceAdd, AddFunc: c.onEndpointSliceAdd,
UpdateFunc: c.onEndpointSliceUpdate, UpdateFunc: func(oldObj, newObj interface{}) {
c.onEndpointSliceUpdate(logger, oldObj, newObj)
},
DeleteFunc: c.onEndpointSliceDelete, DeleteFunc: c.onEndpointSliceDelete,
}) })
@ -393,21 +402,21 @@ func (c *Controller) onServiceDelete(obj interface{}) {
} }
// onEndpointsAdd queues a sync for the relevant Endpoints resource. // onEndpointsAdd queues a sync for the relevant Endpoints resource.
func (c *Controller) onEndpointsAdd(obj interface{}) { func (c *Controller) onEndpointsAdd(logger klog.Logger, obj interface{}) {
endpoints := obj.(*v1.Endpoints) endpoints := obj.(*v1.Endpoints)
if endpoints == nil { if endpoints == nil {
utilruntime.HandleError(fmt.Errorf("onEndpointsAdd() expected type v1.Endpoints, got %T", obj)) utilruntime.HandleError(fmt.Errorf("onEndpointsAdd() expected type v1.Endpoints, got %T", obj))
return return
} }
if !c.shouldMirror(endpoints) { if !c.shouldMirror(endpoints) {
klog.V(5).Infof("Skipping mirroring for %s/%s", endpoints.Namespace, endpoints.Name) logger.V(5).Info("Skipping mirroring", "endpoints", klog.KObj(endpoints))
return return
} }
c.queueEndpoints(obj) c.queueEndpoints(obj)
} }
// onEndpointsUpdate queues a sync for the relevant Endpoints resource. // onEndpointsUpdate queues a sync for the relevant Endpoints resource.
func (c *Controller) onEndpointsUpdate(prevObj, obj interface{}) { func (c *Controller) onEndpointsUpdate(logger klog.Logger, prevObj, obj interface{}) {
endpoints := obj.(*v1.Endpoints) endpoints := obj.(*v1.Endpoints)
prevEndpoints := prevObj.(*v1.Endpoints) prevEndpoints := prevObj.(*v1.Endpoints)
if endpoints == nil || prevEndpoints == nil { if endpoints == nil || prevEndpoints == nil {
@ -415,21 +424,21 @@ func (c *Controller) onEndpointsUpdate(prevObj, obj interface{}) {
return return
} }
if !c.shouldMirror(endpoints) && !c.shouldMirror(prevEndpoints) { if !c.shouldMirror(endpoints) && !c.shouldMirror(prevEndpoints) {
klog.V(5).Infof("Skipping mirroring for %s/%s", endpoints.Namespace, endpoints.Name) logger.V(5).Info("Skipping mirroring", "endpoints", klog.KObj(endpoints))
return return
} }
c.queueEndpoints(obj) c.queueEndpoints(obj)
} }
// onEndpointsDelete queues a sync for the relevant Endpoints resource. // onEndpointsDelete queues a sync for the relevant Endpoints resource.
func (c *Controller) onEndpointsDelete(obj interface{}) { func (c *Controller) onEndpointsDelete(logger klog.Logger, obj interface{}) {
endpoints := getEndpointsFromDeleteAction(obj) endpoints := getEndpointsFromDeleteAction(obj)
if endpoints == nil { if endpoints == nil {
utilruntime.HandleError(fmt.Errorf("onEndpointsDelete() expected type v1.Endpoints, got %T", obj)) utilruntime.HandleError(fmt.Errorf("onEndpointsDelete() expected type v1.Endpoints, got %T", obj))
return return
} }
if !c.shouldMirror(endpoints) { if !c.shouldMirror(endpoints) {
klog.V(5).Infof("Skipping mirroring for %s/%s", endpoints.Namespace, endpoints.Name) logger.V(5).Info("Skipping mirroring", "endpoints", klog.KObj(endpoints))
return return
} }
c.queueEndpoints(obj) c.queueEndpoints(obj)
@ -453,7 +462,7 @@ func (c *Controller) onEndpointSliceAdd(obj interface{}) {
// sync if the EndpointSlice resource version does not match the expected // sync if the EndpointSlice resource version does not match the expected
// version in the endpointSliceTracker or the managed-by value of the // version in the endpointSliceTracker or the managed-by value of the
// EndpointSlice has changed from or to this controller. // EndpointSlice has changed from or to this controller.
func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) { func (c *Controller) onEndpointSliceUpdate(logger klog.Logger, prevObj, obj interface{}) {
prevEndpointSlice := obj.(*discovery.EndpointSlice) prevEndpointSlice := obj.(*discovery.EndpointSlice)
endpointSlice := prevObj.(*discovery.EndpointSlice) endpointSlice := prevObj.(*discovery.EndpointSlice)
if endpointSlice == nil || prevEndpointSlice == nil { if endpointSlice == nil || prevEndpointSlice == nil {
@ -466,7 +475,7 @@ func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) {
svcName := endpointSlice.Labels[discovery.LabelServiceName] svcName := endpointSlice.Labels[discovery.LabelServiceName]
prevSvcName := prevEndpointSlice.Labels[discovery.LabelServiceName] prevSvcName := prevEndpointSlice.Labels[discovery.LabelServiceName]
if svcName != prevSvcName { if svcName != prevSvcName {
klog.Warningf("%s label changed from %s to %s for %s", discovery.LabelServiceName, prevSvcName, svcName, endpointSlice.Name) logger.Info("LabelServiceName changed", "labelServiceName", discovery.LabelServiceName, "oldName", prevSvcName, "newName", svcName, "endpointSlice", klog.KObj(endpointSlice))
c.queueEndpointsForEndpointSlice(endpointSlice) c.queueEndpointsForEndpointSlice(endpointSlice)
c.queueEndpointsForEndpointSlice(prevEndpointSlice) c.queueEndpointsForEndpointSlice(prevEndpointSlice)
return return

View File

@ -49,11 +49,12 @@ type endpointSliceMirroringController struct {
serviceStore cache.Store serviceStore cache.Store
} }
func newController(batchPeriod time.Duration) (*fake.Clientset, *endpointSliceMirroringController) { func newController(ctx context.Context, batchPeriod time.Duration) (*fake.Clientset, *endpointSliceMirroringController) {
client := newClientset() client := newClientset()
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
esController := NewController( esController := NewController(
ctx,
informerFactory.Core().V1().Endpoints(), informerFactory.Core().V1().Endpoints(),
informerFactory.Discovery().V1().EndpointSlices(), informerFactory.Discovery().V1().EndpointSlices(),
informerFactory.Core().V1().Services(), informerFactory.Core().V1().Services(),
@ -224,7 +225,8 @@ func TestSyncEndpoints(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) { t.Run(tc.testName, func(t *testing.T) {
client, esController := newController(time.Duration(0)) _, ctx := ktesting.NewTestContext(t)
client, esController := newController(ctx, time.Duration(0))
tc.endpoints.Name = endpointsName tc.endpoints.Name = endpointsName
tc.endpoints.Namespace = namespace tc.endpoints.Namespace = namespace
esController.endpointsStore.Add(tc.endpoints) esController.endpointsStore.Add(tc.endpoints)
@ -320,7 +322,8 @@ func TestShouldMirror(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) { t.Run(tc.testName, func(t *testing.T) {
_, c := newController(time.Duration(0)) _, ctx := ktesting.NewTestContext(t)
_, c := newController(ctx, time.Duration(0))
if tc.endpoints != nil { if tc.endpoints != nil {
err := c.endpointsStore.Add(tc.endpoints) err := c.endpointsStore.Add(tc.endpoints)
@ -437,7 +440,8 @@ func TestEndpointSlicesMirroredForService(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) { t.Run(tc.testName, func(t *testing.T) {
_, c := newController(time.Duration(0)) _, ctx := ktesting.NewTestContext(t)
_, c := newController(ctx, time.Duration(0))
err := c.endpointSliceStore.Add(tc.endpointSlice) err := c.endpointSliceStore.Add(tc.endpointSlice)
if err != nil { if err != nil {

View File

@ -296,7 +296,7 @@ func (jm *Controller) addPod(logger klog.Logger, obj interface{}) {
if err != nil { if err != nil {
return return
} }
jm.expectations.CreationObserved(jobKey) jm.expectations.CreationObserved(logger, jobKey)
jm.enqueueSyncJobBatched(logger, job) jm.enqueueSyncJobBatched(logger, job)
return return
} }
@ -436,7 +436,7 @@ func (jm *Controller) deletePod(logger klog.Logger, obj interface{}, final bool)
if err != nil { if err != nil {
return return
} }
jm.expectations.DeletionObserved(jobKey) jm.expectations.DeletionObserved(logger, jobKey)
// Consider the finalizer removed if this is the final delete. Otherwise, // Consider the finalizer removed if this is the final delete. Otherwise,
// it's an update for the deletion timestamp, then check finalizer. // it's an update for the deletion timestamp, then check finalizer.
@ -725,7 +725,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
if err != nil { if err != nil {
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
logger.V(4).Info("Job has been deleted", "key", key) logger.V(4).Info("Job has been deleted", "key", key)
jm.expectations.DeleteExpectations(key) jm.expectations.DeleteExpectations(logger, key)
jm.finalizerExpectations.deleteExpectations(logger, key) jm.finalizerExpectations.deleteExpectations(logger, key)
err := jm.podBackoffStore.removeBackoffRecord(key) err := jm.podBackoffStore.removeBackoffRecord(key)
@ -775,7 +775,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters // and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the job sync is just deferred till the next relist. // the store after we've checked the expectation, the job sync is just deferred till the next relist.
satisfiedExpectations := jm.expectations.SatisfiedExpectations(key) satisfiedExpectations := jm.expectations.SatisfiedExpectations(logger, key)
pods, err := jm.getPodsForJob(ctx, &job) pods, err := jm.getPodsForJob(ctx, &job)
if err != nil { if err != nil {
@ -785,7 +785,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
jobCtx := &syncJobCtx{ jobCtx := &syncJobCtx{
job: &job, job: &job,
pods: pods, pods: pods,
activePods: controller.FilterActivePods(pods), activePods: controller.FilterActivePods(logger, pods),
uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods), uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods),
expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key), expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key),
} }
@ -947,7 +947,7 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey
failDelete := func(pod *v1.Pod, err error) { failDelete := func(pod *v1.Pod, err error) {
// Decrement the expected number of deletes because the informer won't observe this deletion // Decrement the expected number of deletes because the informer won't observe this deletion
jm.expectations.DeletionObserved(jobKey) jm.expectations.DeletionObserved(logger, jobKey)
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
logger.V(2).Info("Failed to delete Pod", "job", klog.KObj(job), "pod", klog.KObj(pod), "err", err) logger.V(2).Info("Failed to delete Pod", "job", klog.KObj(job), "pod", klog.KObj(pod), "err", err)
atomic.AddInt32(&successfulDeletes, -1) atomic.AddInt32(&successfulDeletes, -1)
@ -1394,7 +1394,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
if jobSuspended(job) { if jobSuspended(job) {
logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active) logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(active)) podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(active))
jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed active -= removed
return active, metrics.JobSyncActionPodsDeleted, err return active, metrics.JobSyncActionPodsDeleted, err
@ -1431,7 +1431,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync] podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync]
} }
if len(podsToDelete) > 0 { if len(podsToDelete) > 0 {
jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive) logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed active -= removed
@ -1453,7 +1453,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
diff = int32(MaxPodCreateDeletePerSync) diff = int32(MaxPodCreateDeletePerSync)
} }
jm.expectations.ExpectCreations(jobKey, int(diff)) jm.expectations.ExpectCreations(logger, jobKey, int(diff))
errCh := make(chan error, diff) errCh := make(chan error, diff)
logger.V(4).Info("Too few pods running", "key", jobKey, "need", wantActive, "creating", diff) logger.V(4).Info("Too few pods running", "key", jobKey, "need", wantActive, "creating", diff)
@ -1511,7 +1511,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
defer utilruntime.HandleError(err) defer utilruntime.HandleError(err)
// Decrement the expected number of creates because the informer won't observe this pod // Decrement the expected number of creates because the informer won't observe this pod
logger.V(2).Info("Failed creation, decrementing expectations", "job", klog.KObj(job)) logger.V(2).Info("Failed creation, decrementing expectations", "job", klog.KObj(job))
jm.expectations.CreationObserved(jobKey) jm.expectations.CreationObserved(logger, jobKey)
atomic.AddInt32(&active, -1) atomic.AddInt32(&active, -1)
errCh <- err errCh <- err
} }
@ -1525,7 +1525,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
active -= skippedPods active -= skippedPods
for i := int32(0); i < skippedPods; i++ { for i := int32(0); i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod // Decrement the expected number of creates because the informer won't observe this pod
jm.expectations.CreationObserved(jobKey) jm.expectations.CreationObserved(logger, jobKey)
} }
// The skipped pods will be retried later. The next controller resync will // The skipped pods will be retried later. The next controller resync will
// retry the slow start process. // retry the slow start process.

View File

@ -785,6 +785,7 @@ func TestControllerSyncJob(t *testing.T) {
for name, tc := range testCases { for name, tc := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)()
// job manager setup // job manager setup
@ -828,9 +829,9 @@ func TestControllerSyncJob(t *testing.T) {
manager.podBackoffStore.updateBackoffRecord(*tc.backoffRecord) manager.podBackoffStore.updateBackoffRecord(*tc.backoffRecord)
} }
if tc.fakeExpectationAtCreation < 0 { if tc.fakeExpectationAtCreation < 0 {
manager.expectations.ExpectDeletions(key, int(-tc.fakeExpectationAtCreation)) manager.expectations.ExpectDeletions(logger, key, int(-tc.fakeExpectationAtCreation))
} else if tc.fakeExpectationAtCreation > 0 { } else if tc.fakeExpectationAtCreation > 0 {
manager.expectations.ExpectCreations(key, int(tc.fakeExpectationAtCreation)) manager.expectations.ExpectCreations(logger, key, int(tc.fakeExpectationAtCreation))
} }
if tc.wasSuspended { if tc.wasSuspended {
job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended", realClock.Now())) job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended", realClock.Now()))
@ -3673,7 +3674,7 @@ type FakeJobExpectations struct {
expSatisfied func() expSatisfied func()
} }
func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool { func (fe FakeJobExpectations) SatisfiedExpectations(logger klog.Logger, controllerKey string) bool {
fe.expSatisfied() fe.expSatisfied()
return fe.satisfied return fe.satisfied
} }

View File

@ -210,8 +210,9 @@ func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) {
defer rsc.queue.ShutDown() defer rsc.queue.ShutDown()
controllerName := strings.ToLower(rsc.Kind) controllerName := strings.ToLower(rsc.Kind)
klog.FromContext(ctx).Info("Starting controller", "name", controllerName) logger := klog.FromContext(ctx)
defer klog.FromContext(ctx).Info("Shutting down controller", "name", controllerName) logger.Info("Starting controller", "name", controllerName)
defer logger.Info("Shutting down controller", "name", controllerName)
if !cache.WaitForNamedCacheSync(rsc.Kind, ctx.Done(), rsc.podListerSynced, rsc.rsListerSynced) { if !cache.WaitForNamedCacheSync(rsc.Kind, ctx.Done(), rsc.podListerSynced, rsc.rsListerSynced) {
return return
@ -371,7 +372,7 @@ func (rsc *ReplicaSetController) deleteRS(logger klog.Logger, obj interface{}) {
logger.V(4).Info("Deleting", "replicaSet", klog.KObj(rs)) logger.V(4).Info("Deleting", "replicaSet", klog.KObj(rs))
// Delete expectations for the ReplicaSet so if we create a new one with the same name it starts clean // Delete expectations for the ReplicaSet so if we create a new one with the same name it starts clean
rsc.expectations.DeleteExpectations(key) rsc.expectations.DeleteExpectations(logger, key)
rsc.queue.Add(key) rsc.queue.Add(key)
} }
@ -398,7 +399,7 @@ func (rsc *ReplicaSetController) addPod(logger klog.Logger, obj interface{}) {
return return
} }
logger.V(4).Info("Pod created", "pod", klog.KObj(pod), "detail", pod) logger.V(4).Info("Pod created", "pod", klog.KObj(pod), "detail", pod)
rsc.expectations.CreationObserved(rsKey) rsc.expectations.CreationObserved(logger, rsKey)
rsc.queue.Add(rsKey) rsc.queue.Add(rsKey)
return return
} }
@ -529,7 +530,7 @@ func (rsc *ReplicaSetController) deletePod(logger klog.Logger, obj interface{})
return return
} }
logger.V(4).Info("Pod deleted", "delete_by", utilruntime.GetCaller(), "deletion_timestamp", pod.DeletionTimestamp, "pod", klog.KObj(pod)) logger.V(4).Info("Pod deleted", "delete_by", utilruntime.GetCaller(), "deletion_timestamp", pod.DeletionTimestamp, "pod", klog.KObj(pod))
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod)) rsc.expectations.DeletionObserved(logger, rsKey, controller.PodKey(pod))
rsc.queue.Add(rsKey) rsc.queue.Add(rsKey)
} }
@ -569,6 +570,7 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod
utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err)) utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
return nil return nil
} }
logger := klog.FromContext(ctx)
if diff < 0 { if diff < 0 {
diff *= -1 diff *= -1
if diff > rsc.burstReplicas { if diff > rsc.burstReplicas {
@ -579,8 +581,8 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod
// UID, which would require locking *across* the create, which will turn // UID, which would require locking *across* the create, which will turn
// into a performance bottleneck. We should generate a UID for the pod // into a performance bottleneck. We should generate a UID for the pod
// beforehand and store it via ExpectCreations. // beforehand and store it via ExpectCreations.
rsc.expectations.ExpectCreations(rsKey, diff) rsc.expectations.ExpectCreations(logger, rsKey, diff)
klog.FromContext(ctx).V(2).Info("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff) logger.V(2).Info("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start". // and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would // This handles attempts to start large numbers of pods that would
@ -605,10 +607,10 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod
// The skipped pods will be retried later. The next controller resync will // The skipped pods will be retried later. The next controller resync will
// retry the slow start process. // retry the slow start process.
if skippedPods := diff - successfulCreations; skippedPods > 0 { if skippedPods := diff - successfulCreations; skippedPods > 0 {
klog.FromContext(ctx).V(2).Info("Slow-start failure. Skipping creation of pods, decrementing expectations", "podsSkipped", skippedPods, "kind", rsc.Kind, "replicaSet", klog.KObj(rs)) logger.V(2).Info("Slow-start failure. Skipping creation of pods, decrementing expectations", "podsSkipped", skippedPods, "kind", rsc.Kind, "replicaSet", klog.KObj(rs))
for i := 0; i < skippedPods; i++ { for i := 0; i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod // Decrement the expected number of creates because the informer won't observe this pod
rsc.expectations.CreationObserved(rsKey) rsc.expectations.CreationObserved(logger, rsKey)
} }
} }
return err return err
@ -616,9 +618,9 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod
if diff > rsc.burstReplicas { if diff > rsc.burstReplicas {
diff = rsc.burstReplicas diff = rsc.burstReplicas
} }
klog.FromContext(ctx).V(2).Info("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff) logger.V(2).Info("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)
relatedPods, err := rsc.getIndirectlyRelatedPods(klog.FromContext(ctx), rs) relatedPods, err := rsc.getIndirectlyRelatedPods(logger, rs)
utilruntime.HandleError(err) utilruntime.HandleError(err)
// Choose which Pods to delete, preferring those in earlier phases of startup. // Choose which Pods to delete, preferring those in earlier phases of startup.
@ -630,7 +632,7 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod
// Note that if the labels on a pod/rs change in a way that the pod gets // Note that if the labels on a pod/rs change in a way that the pod gets
// orphaned, the rs will only wake up after the expectations have // orphaned, the rs will only wake up after the expectations have
// expired even if other pods are deleted. // expired even if other pods are deleted.
rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete)) rsc.expectations.ExpectDeletions(logger, rsKey, getPodKeys(podsToDelete))
errCh := make(chan error, diff) errCh := make(chan error, diff)
var wg sync.WaitGroup var wg sync.WaitGroup
@ -641,9 +643,9 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod
if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil { if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion // Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(targetPod) podKey := controller.PodKey(targetPod)
rsc.expectations.DeletionObserved(rsKey, podKey) rsc.expectations.DeletionObserved(logger, rsKey, podKey)
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
klog.FromContext(ctx).V(2).Info("Failed to delete pod, decremented expectations", "pod", podKey, "kind", rsc.Kind, "replicaSet", klog.KObj(rs)) logger.V(2).Info("Failed to delete pod, decremented expectations", "pod", podKey, "kind", rsc.Kind, "replicaSet", klog.KObj(rs))
errCh <- err errCh <- err
} }
} }
@ -668,9 +670,10 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key. // invoked concurrently with the same key.
func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error { func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error {
logger := klog.FromContext(ctx)
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
klog.FromContext(ctx).V(4).Info("Finished syncing", "kind", rsc.Kind, "key", key, "duration", time.Since(startTime)) logger.Info("Finished syncing", "kind", rsc.Kind, "key", key, "duration", time.Since(startTime))
}() }()
namespace, name, err := cache.SplitMetaNamespaceKey(key) namespace, name, err := cache.SplitMetaNamespaceKey(key)
@ -679,15 +682,15 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string)
} }
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name) rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
klog.FromContext(ctx).V(4).Info("deleted", "kind", rsc.Kind, "key", key) logger.V(4).Info("deleted", "kind", rsc.Kind, "key", key)
rsc.expectations.DeleteExpectations(key) rsc.expectations.DeleteExpectations(logger, key)
return nil return nil
} }
if err != nil { if err != nil {
return err return err
} }
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key) rsNeedsSync := rsc.expectations.SatisfiedExpectations(logger, key)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector for rs %v/%v: %v", namespace, name, err)) utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector for rs %v/%v: %v", namespace, name, err))
@ -702,7 +705,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string)
return err return err
} }
// Ignore inactive pods. // Ignore inactive pods.
filteredPods := controller.FilterActivePods(allPods) filteredPods := controller.FilterActivePods(logger, allPods)
// NOTE: filteredPods are pointing to objects from cache - if you need to // NOTE: filteredPods are pointing to objects from cache - if you need to
// modify them, you need to copy it first. // modify them, you need to copy it first.
@ -719,7 +722,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string)
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr) newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
// Always updates status as pods come up or die. // Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(klog.FromContext(ctx), rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus) updatedRS, err := updateReplicaSetStatus(logger, rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil { if err != nil {
// Multiple things could lead to this update failing. Requeuing the replica set ensures // Multiple things could lead to this update failing. Requeuing the replica set ensures
// Returning an error causes a requeue without forcing a hotloop // Returning an error causes a requeue without forcing a hotloop

View File

@ -49,6 +49,7 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
utiltesting "k8s.io/client-go/util/testing" utiltesting "k8s.io/client-go/util/testing"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting" "k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
. "k8s.io/kubernetes/pkg/controller/testutil" . "k8s.io/kubernetes/pkg/controller/testutil"
@ -305,7 +306,7 @@ func TestSyncReplicaSetCreateFailures(t *testing.T) {
func TestSyncReplicaSetDormancy(t *testing.T) { func TestSyncReplicaSetDormancy(t *testing.T) {
// Setup a test server so we can lie about the current state of pods // Setup a test server so we can lie about the current state of pods
_, ctx := ktesting.NewTestContext(t) logger, ctx := ktesting.NewTestContext(t)
fakeHandler := utiltesting.FakeHandler{ fakeHandler := utiltesting.FakeHandler{
StatusCode: 200, StatusCode: 200,
ResponseBody: "{}", ResponseBody: "{}",
@ -357,7 +358,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
// Lowering expectations should lead to a sync that creates a replica, however the // Lowering expectations should lead to a sync that creates a replica, however the
// fakePodControl error will prevent this, leaving expectations at 0, 0 // fakePodControl error will prevent this, leaving expectations at 0, 0
manager.expectations.CreationObserved(rsKey) manager.expectations.CreationObserved(logger, rsKey)
rsSpec.Status.Replicas = 1 rsSpec.Status.Replicas = 1
rsSpec.Status.ReadyReplicas = 1 rsSpec.Status.ReadyReplicas = 1
rsSpec.Status.AvailableReplicas = 1 rsSpec.Status.AvailableReplicas = 1
@ -1094,7 +1095,7 @@ type FakeRSExpectations struct {
expSatisfied func() expSatisfied func()
} }
func (fe FakeRSExpectations) SatisfiedExpectations(controllerKey string) bool { func (fe FakeRSExpectations) SatisfiedExpectations(logger klog.Logger, controllerKey string) bool {
fe.expSatisfied() fe.expSatisfied()
return fe.satisfied return fe.satisfied
} }
@ -1410,7 +1411,7 @@ func TestDeletionTimestamp(t *testing.T) {
pod := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod").Items[0] pod := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod").Items[0]
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
pod.ResourceVersion = "1" pod.ResourceVersion = "1"
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)}) manager.expectations.ExpectDeletions(logger, rsKey, []string{controller.PodKey(&pod)})
// A pod added with a deletion timestamp should decrement deletions, not creations. // A pod added with a deletion timestamp should decrement deletions, not creations.
manager.addPod(logger, &pod) manager.addPod(logger, &pod)
@ -1430,7 +1431,7 @@ func TestDeletionTimestamp(t *testing.T) {
// as a deletion. // as a deletion.
oldPod := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod").Items[0] oldPod := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod").Items[0]
oldPod.ResourceVersion = "2" oldPod.ResourceVersion = "2"
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)}) manager.expectations.ExpectDeletions(logger, rsKey, []string{controller.PodKey(&pod)})
manager.updatePod(logger, &oldPod, &pod) manager.updatePod(logger, &oldPod, &pod)
queueRS, _ = manager.queue.Get() queueRS, _ = manager.queue.Get()
@ -1457,7 +1458,7 @@ func TestDeletionTimestamp(t *testing.T) {
}, },
}, },
} }
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(secondPod)}) manager.expectations.ExpectDeletions(logger, rsKey, []string{controller.PodKey(secondPod)})
oldPod.DeletionTimestamp = &metav1.Time{Time: time.Now()} oldPod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
oldPod.ResourceVersion = "2" oldPod.ResourceVersion = "2"
manager.updatePod(logger, &oldPod, &pod) manager.updatePod(logger, &oldPod, &pod)

View File

@ -34,7 +34,6 @@ import (
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing" clienttesting "k8s.io/client-go/testing"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting" "k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
volumeutil "k8s.io/kubernetes/pkg/volume/util" volumeutil "k8s.io/kubernetes/pkg/volume/util"
@ -445,7 +444,7 @@ func TestPVCProtectionController(t *testing.T) {
break break
} }
if ctrl.queue.Len() > 0 { if ctrl.queue.Len() > 0 {
klog.V(5).Infof("Test %q: %d events queue, processing one", test.name, ctrl.queue.Len()) logger.V(5).Info("Non-empty queue, processing one", "test", test.name, "queueLength", ctrl.queue.Len())
ctrl.processNextWorkItem(context.TODO()) ctrl.processNextWorkItem(context.TODO())
} }
if ctrl.queue.Len() > 0 { if ctrl.queue.Len() > 0 {
@ -456,7 +455,7 @@ func TestPVCProtectionController(t *testing.T) {
if currentActionCount < len(test.expectedActions) { if currentActionCount < len(test.expectedActions) {
// Do not log every wait, only when the action count changes. // Do not log every wait, only when the action count changes.
if lastReportedActionCount < currentActionCount { if lastReportedActionCount < currentActionCount {
klog.V(5).Infof("Test %q: got %d actions out of %d, waiting for the rest", test.name, currentActionCount, len(test.expectedActions)) logger.V(5).Info("Waiting for the remaining actions", "test", test.name, "currentActionCount", currentActionCount, "expectedActionCount", len(test.expectedActions))
lastReportedActionCount = currentActionCount lastReportedActionCount = currentActionCount
} }
// The test expected more to happen, wait for the actions. // The test expected more to happen, wait for the actions.

View File

@ -33,7 +33,6 @@ import (
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing" clienttesting "k8s.io/client-go/testing"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting" "k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
volumeutil "k8s.io/kubernetes/pkg/volume/util" volumeutil "k8s.io/kubernetes/pkg/volume/util"
@ -227,7 +226,7 @@ func TestPVProtectionController(t *testing.T) {
break break
} }
if ctrl.queue.Len() > 0 { if ctrl.queue.Len() > 0 {
klog.V(5).Infof("Test %q: %d events queue, processing one", test.name, ctrl.queue.Len()) logger.V(5).Info("Non-empty events queue, processing one", "test", test.name, "queueLength", ctrl.queue.Len())
ctrl.processNextWorkItem(context.TODO()) ctrl.processNextWorkItem(context.TODO())
} }
if ctrl.queue.Len() > 0 { if ctrl.queue.Len() > 0 {
@ -238,7 +237,7 @@ func TestPVProtectionController(t *testing.T) {
if currentActionCount < len(test.expectedActions) { if currentActionCount < len(test.expectedActions) {
// Do not log evey wait, only when the action count changes. // Do not log evey wait, only when the action count changes.
if lastReportedActionCount < currentActionCount { if lastReportedActionCount < currentActionCount {
klog.V(5).Infof("Test %q: got %d actions out of %d, waiting for the rest", test.name, currentActionCount, len(test.expectedActions)) logger.V(5).Info("Waiting for the remaining actions", "test", test.name, "currentActionCount", currentActionCount, "expectedActionCount", len(test.expectedActions))
lastReportedActionCount = currentActionCount lastReportedActionCount = currentActionCount
} }
// The test expected more to happen, wait for the actions. // The test expected more to happen, wait for the actions.

View File

@ -70,6 +70,7 @@ func TestEndpointSliceMirroring(t *testing.T) {
1*time.Second) 1*time.Second)
epsmController := endpointslicemirroring.NewController( epsmController := endpointslicemirroring.NewController(
ctx,
informers.Core().V1().Endpoints(), informers.Core().V1().Endpoints(),
informers.Discovery().V1().EndpointSlices(), informers.Discovery().V1().EndpointSlices(),
informers.Core().V1().Services(), informers.Core().V1().Services(),
@ -311,6 +312,7 @@ func TestEndpointSliceMirroring(t *testing.T) {
} }
func TestEndpointSliceMirroringUpdates(t *testing.T) { func TestEndpointSliceMirroringUpdates(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
defer server.TearDownFn() defer server.TearDownFn()
@ -324,6 +326,7 @@ func TestEndpointSliceMirroringUpdates(t *testing.T) {
informers := informers.NewSharedInformerFactory(client, resyncPeriod) informers := informers.NewSharedInformerFactory(client, resyncPeriod)
epsmController := endpointslicemirroring.NewController( epsmController := endpointslicemirroring.NewController(
ctx,
informers.Core().V1().Endpoints(), informers.Core().V1().Endpoints(),
informers.Discovery().V1().EndpointSlices(), informers.Discovery().V1().EndpointSlices(),
informers.Core().V1().Services(), informers.Core().V1().Services(),
@ -332,7 +335,6 @@ func TestEndpointSliceMirroringUpdates(t *testing.T) {
1*time.Second) 1*time.Second)
// Start informer and controllers // Start informer and controllers
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
informers.Start(ctx.Done()) informers.Start(ctx.Done())
@ -487,6 +489,7 @@ func TestEndpointSliceMirroringUpdates(t *testing.T) {
} }
func TestEndpointSliceMirroringSelectorTransition(t *testing.T) { func TestEndpointSliceMirroringSelectorTransition(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
defer server.TearDownFn() defer server.TearDownFn()
@ -500,6 +503,7 @@ func TestEndpointSliceMirroringSelectorTransition(t *testing.T) {
informers := informers.NewSharedInformerFactory(client, resyncPeriod) informers := informers.NewSharedInformerFactory(client, resyncPeriod)
epsmController := endpointslicemirroring.NewController( epsmController := endpointslicemirroring.NewController(
ctx,
informers.Core().V1().Endpoints(), informers.Core().V1().Endpoints(),
informers.Discovery().V1().EndpointSlices(), informers.Discovery().V1().EndpointSlices(),
informers.Core().V1().Services(), informers.Core().V1().Services(),
@ -508,7 +512,6 @@ func TestEndpointSliceMirroringSelectorTransition(t *testing.T) {
1*time.Second) 1*time.Second)
// Start informer and controllers // Start informer and controllers
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
informers.Start(ctx.Done()) informers.Start(ctx.Done())