diff --git a/cmd/cloud-controller-manager/.import-restrictions b/cmd/cloud-controller-manager/.import-restrictions index 24cdef34a6e..53eed5eb7b8 100644 --- a/cmd/cloud-controller-manager/.import-restrictions +++ b/cmd/cloud-controller-manager/.import-restrictions @@ -41,4 +41,5 @@ rules: - k8s.io/kubernetes/pkg/proxy/util - k8s.io/kubernetes/pkg/proxy/util/testing - k8s.io/kubernetes/pkg/util/slice - - k8s.io/kubernetes/pkg/util/sysctl \ No newline at end of file + - k8s.io/kubernetes/pkg/util/sysctl + - k8s.io/kubernetes/test/utils/ktesting diff --git a/cmd/kube-controller-manager/app/discovery.go b/cmd/kube-controller-manager/app/discovery.go index a09bebddfdc..7ed711a8d73 100644 --- a/cmd/kube-controller-manager/app/discovery.go +++ b/cmd/kube-controller-manager/app/discovery.go @@ -43,6 +43,7 @@ func startEndpointSliceController(ctx context.Context, controllerContext Control func startEndpointSliceMirroringController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { go endpointslicemirroringcontroller.NewController( + ctx, controllerContext.InformerFactory.Core().V1().Endpoints(), controllerContext.InformerFactory.Discovery().V1().EndpointSlices(), controllerContext.InformerFactory.Core().V1().Services(), diff --git a/hack/logcheck.conf b/hack/logcheck.conf index 4a397f268fd..9a44e6bcadc 100644 --- a/hack/logcheck.conf +++ b/hack/logcheck.conf @@ -34,19 +34,6 @@ contextual k8s.io/kubernetes/cmd/kube-scheduler/.* contextual k8s.io/kubernetes/pkg/controller/.* contextual k8s.io/kubernetes/test/e2e/dra/.* -# Most of kube-controller-manager has been converted, but not everything. At -# this point it is easier to list the exceptions. --contextual k8s.io/kubernetes/pkg/controller/controller_ref_manager.go --contextual k8s.io/kubernetes/pkg/controller/controller_utils.go --contextual k8s.io/kubernetes/pkg/controller/endpoint/.* --contextual k8s.io/kubernetes/pkg/controller/endpointslicemirroring/.* --contextual k8s.io/kubernetes/pkg/controller/garbagecollector/.* --contextual k8s.io/kubernetes/pkg/controller/nodeipam/.* --contextual k8s.io/kubernetes/pkg/controller/podgc/.* --contextual k8s.io/kubernetes/pkg/controller/replicaset/.* --contextual k8s.io/kubernetes/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go --contextual k8s.io/kubernetes/pkg/controller/volume/pvprotection/pv_protection_controller_test.go - # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift # this restriction. Whether we then do a global search/replace remains diff --git a/pkg/controller/controller_ref_manager.go b/pkg/controller/controller_ref_manager.go index 740c98d32a8..68a99ea6f59 100644 --- a/pkg/controller/controller_ref_manager.go +++ b/pkg/controller/controller_ref_manager.go @@ -236,8 +236,8 @@ func (m *PodControllerRefManager) AdoptPod(ctx context.Context, pod *v1.Pod) err // ReleasePod sends a patch to free the pod from the control of the controller. // It returns the error if the patching fails. 404 and 422 errors are ignored. func (m *PodControllerRefManager) ReleasePod(ctx context.Context, pod *v1.Pod) error { - klog.V(2).Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s", - pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) + logger := klog.FromContext(ctx) + logger.V(2).Info("Patching pod to remove its controllerRef", "pod", klog.KObj(pod), "gvk", m.controllerKind, "controller", m.Controller.GetName()) patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(pod.UID, []types.UID{m.Controller.GetUID()}, m.finalizers...) if err != nil { return err @@ -361,8 +361,8 @@ func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(ctx context.Context, rs // ReleaseReplicaSet sends a patch to free the ReplicaSet from the control of the Deployment controller. // It returns the error if the patching fails. 404 and 422 errors are ignored. func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(ctx context.Context, replicaSet *apps.ReplicaSet) error { - klog.V(2).Infof("patching ReplicaSet %s_%s to remove its controllerRef to %s/%s:%s", - replicaSet.Namespace, replicaSet.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) + logger := klog.FromContext(ctx) + logger.V(2).Info("Patching ReplicaSet to remove its controllerRef", "replicaSet", klog.KObj(replicaSet), "gvk", m.controllerKind, "controller", m.Controller.GetName()) patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(replicaSet.UID, []types.UID{m.Controller.GetUID()}) if err != nil { return err @@ -499,8 +499,8 @@ func (m *ControllerRevisionControllerRefManager) AdoptControllerRevision(ctx con // ReleaseControllerRevision sends a patch to free the ControllerRevision from the control of its controller. // It returns the error if the patching fails. 404 and 422 errors are ignored. func (m *ControllerRevisionControllerRefManager) ReleaseControllerRevision(ctx context.Context, history *apps.ControllerRevision) error { - klog.V(2).Infof("patching ControllerRevision %s_%s to remove its controllerRef to %s/%s:%s", - history.Namespace, history.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) + logger := klog.FromContext(ctx) + logger.V(2).Info("Patching ControllerRevision to remove its controllerRef", "controllerRevision", klog.KObj(history), "gvk", m.controllerKind, "controller", m.Controller.GetName()) patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(history.UID, []types.UID{m.Controller.GetUID()}) if err != nil { return err diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 252bfe365b0..5e94893f6f1 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -146,15 +146,15 @@ var ExpKeyFunc = func(obj interface{}) (string, error) { // types of controllers, because the keys might conflict across types. type ControllerExpectationsInterface interface { GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) - SatisfiedExpectations(controllerKey string) bool - DeleteExpectations(controllerKey string) - SetExpectations(controllerKey string, add, del int) error - ExpectCreations(controllerKey string, adds int) error - ExpectDeletions(controllerKey string, dels int) error - CreationObserved(controllerKey string) - DeletionObserved(controllerKey string) - RaiseExpectations(controllerKey string, add, del int) - LowerExpectations(controllerKey string, add, del int) + SatisfiedExpectations(logger klog.Logger, controllerKey string) bool + DeleteExpectations(logger klog.Logger, controllerKey string) + SetExpectations(logger klog.Logger, controllerKey string, add, del int) error + ExpectCreations(logger klog.Logger, controllerKey string, adds int) error + ExpectDeletions(logger klog.Logger, controllerKey string, dels int) error + CreationObserved(logger klog.Logger, controllerKey string) + DeletionObserved(logger klog.Logger, controllerKey string) + RaiseExpectations(logger klog.Logger, controllerKey string, add, del int) + LowerExpectations(logger klog.Logger, controllerKey string, add, del int) } // ControllerExpectations is a cache mapping controllers to what they expect to see before being woken up for a sync. @@ -172,10 +172,11 @@ func (r *ControllerExpectations) GetExpectations(controllerKey string) (*Control } // DeleteExpectations deletes the expectations of the given controller from the TTLStore. -func (r *ControllerExpectations) DeleteExpectations(controllerKey string) { +func (r *ControllerExpectations) DeleteExpectations(logger klog.Logger, controllerKey string) { if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists { if err := r.Delete(exp); err != nil { - klog.V(2).Infof("Error deleting expectations for controller %v: %v", controllerKey, err) + + logger.V(2).Info("Error deleting expectations", "controller", controllerKey, "err", err) } } } @@ -183,27 +184,27 @@ func (r *ControllerExpectations) DeleteExpectations(controllerKey string) { // SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed. // Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller // manager. -func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool { +func (r *ControllerExpectations) SatisfiedExpectations(logger klog.Logger, controllerKey string) bool { if exp, exists, err := r.GetExpectations(controllerKey); exists { if exp.Fulfilled() { - klog.V(4).InfoS("Controller expectations fulfilled", "expectations", exp) + logger.V(4).Info("Controller expectations fulfilled", "expectations", exp) return true } else if exp.isExpired() { - klog.V(4).InfoS("Controller expectations expired", "expectations", exp) + logger.V(4).Info("Controller expectations expired", "expectations", exp) return true } else { - klog.V(4).InfoS("Controller still waiting on expectations", "expectations", exp) + logger.V(4).Info("Controller still waiting on expectations", "expectations", exp) return false } } else if err != nil { - klog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err) + logger.V(2).Info("Error encountered while checking expectations, forcing sync", "err", err) } else { // When a new controller is created, it doesn't have expectations. // When it doesn't see expected watch events for > TTL, the expectations expire. // - In this case it wakes up, creates/deletes controllees, and sets expectations again. // When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire. // - In this case it continues without setting expectations till it needs to create/delete controllees. - klog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey) + logger.V(4).Info("Controller either never recorded expectations, or the ttl expired", "controller", controllerKey) } // Trigger a sync if we either encountered and error (which shouldn't happen since we're // getting from local store) or this controller hasn't established expectations. @@ -218,46 +219,46 @@ func (exp *ControlleeExpectations) isExpired() bool { } // SetExpectations registers new expectations for the given controller. Forgets existing expectations. -func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error { +func (r *ControllerExpectations) SetExpectations(logger klog.Logger, controllerKey string, add, del int) error { exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()} - klog.V(4).InfoS("Setting expectations", "expectations", exp) + logger.V(4).Info("Setting expectations", "expectations", exp) return r.Add(exp) } -func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error { - return r.SetExpectations(controllerKey, adds, 0) +func (r *ControllerExpectations) ExpectCreations(logger klog.Logger, controllerKey string, adds int) error { + return r.SetExpectations(logger, controllerKey, adds, 0) } -func (r *ControllerExpectations) ExpectDeletions(controllerKey string, dels int) error { - return r.SetExpectations(controllerKey, 0, dels) +func (r *ControllerExpectations) ExpectDeletions(logger klog.Logger, controllerKey string, dels int) error { + return r.SetExpectations(logger, controllerKey, 0, dels) } // Decrements the expectation counts of the given controller. -func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) { +func (r *ControllerExpectations) LowerExpectations(logger klog.Logger, controllerKey string, add, del int) { if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { exp.Add(int64(-add), int64(-del)) // The expectations might've been modified since the update on the previous line. - klog.V(4).InfoS("Lowered expectations", "expectations", exp) + logger.V(4).Info("Lowered expectations", "expectations", exp) } } // Increments the expectation counts of the given controller. -func (r *ControllerExpectations) RaiseExpectations(controllerKey string, add, del int) { +func (r *ControllerExpectations) RaiseExpectations(logger klog.Logger, controllerKey string, add, del int) { if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { exp.Add(int64(add), int64(del)) // The expectations might've been modified since the update on the previous line. - klog.V(4).Infof("Raised expectations", "expectations", exp) + logger.V(4).Info("Raised expectations", "expectations", exp) } } // CreationObserved atomically decrements the `add` expectation count of the given controller. -func (r *ControllerExpectations) CreationObserved(controllerKey string) { - r.LowerExpectations(controllerKey, 1, 0) +func (r *ControllerExpectations) CreationObserved(logger klog.Logger, controllerKey string) { + r.LowerExpectations(logger, controllerKey, 1, 0) } // DeletionObserved atomically decrements the `del` expectation count of the given controller. -func (r *ControllerExpectations) DeletionObserved(controllerKey string) { - r.LowerExpectations(controllerKey, 0, 1) +func (r *ControllerExpectations) DeletionObserved(logger klog.Logger, controllerKey string) { + r.LowerExpectations(logger, controllerKey, 0, 1) } // ControlleeExpectations track controllee creates/deletes. @@ -349,47 +350,47 @@ func (u *UIDTrackingControllerExpectations) GetUIDs(controllerKey string) sets.S } // ExpectDeletions records expectations for the given deleteKeys, against the given controller. -func (u *UIDTrackingControllerExpectations) ExpectDeletions(rcKey string, deletedKeys []string) error { +func (u *UIDTrackingControllerExpectations) ExpectDeletions(logger klog.Logger, rcKey string, deletedKeys []string) error { expectedUIDs := sets.NewString() for _, k := range deletedKeys { expectedUIDs.Insert(k) } - klog.V(4).Infof("Controller %v waiting on deletions for: %+v", rcKey, deletedKeys) + logger.V(4).Info("Controller waiting on deletions", "controller", rcKey, "keys", deletedKeys) u.uidStoreLock.Lock() defer u.uidStoreLock.Unlock() if existing := u.GetUIDs(rcKey); existing != nil && existing.Len() != 0 { - klog.Errorf("Clobbering existing delete keys: %+v", existing) + logger.Error(nil, "Clobbering existing delete keys", "keys", existing) } if err := u.uidStore.Add(&UIDSet{expectedUIDs, rcKey}); err != nil { return err } - return u.ControllerExpectationsInterface.ExpectDeletions(rcKey, expectedUIDs.Len()) + return u.ControllerExpectationsInterface.ExpectDeletions(logger, rcKey, expectedUIDs.Len()) } // DeletionObserved records the given deleteKey as a deletion, for the given rc. -func (u *UIDTrackingControllerExpectations) DeletionObserved(rcKey, deleteKey string) { +func (u *UIDTrackingControllerExpectations) DeletionObserved(logger klog.Logger, rcKey, deleteKey string) { u.uidStoreLock.Lock() defer u.uidStoreLock.Unlock() uids := u.GetUIDs(rcKey) if uids != nil && uids.Has(deleteKey) { - klog.V(4).Infof("Controller %v received delete for pod %v", rcKey, deleteKey) - u.ControllerExpectationsInterface.DeletionObserved(rcKey) + logger.V(4).Info("Controller received delete for pod", "controller", rcKey, "key", deleteKey) + u.ControllerExpectationsInterface.DeletionObserved(logger, rcKey) uids.Delete(deleteKey) } } // DeleteExpectations deletes the UID set and invokes DeleteExpectations on the // underlying ControllerExpectationsInterface. -func (u *UIDTrackingControllerExpectations) DeleteExpectations(rcKey string) { +func (u *UIDTrackingControllerExpectations) DeleteExpectations(logger klog.Logger, rcKey string) { u.uidStoreLock.Lock() defer u.uidStoreLock.Unlock() - u.ControllerExpectationsInterface.DeleteExpectations(rcKey) + u.ControllerExpectationsInterface.DeleteExpectations(logger, rcKey) if uidExp, exists, err := u.uidStore.GetByKey(rcKey); err == nil && exists { if err := u.uidStore.Delete(uidExp); err != nil { - klog.V(2).Infof("Error deleting uid expectations for controller %v: %v", rcKey, err) + logger.V(2).Info("Error deleting uid expectations", "controller", rcKey, "err", err) } } } @@ -587,12 +588,13 @@ func (r RealPodControl) createPods(ctx context.Context, namespace string, pod *v } return err } + logger := klog.FromContext(ctx) accessor, err := meta.Accessor(object) if err != nil { - klog.Errorf("parentObject does not have ObjectMeta, %v", err) + logger.Error(err, "parentObject does not have ObjectMeta") return nil } - klog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), newPod.Name) + logger.V(4).Info("Controller created pod", "controller", accessor.GetName(), "pod", klog.KObj(newPod)) r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulCreatePodReason, "Created pod: %v", newPod.Name) return nil @@ -603,10 +605,11 @@ func (r RealPodControl) DeletePod(ctx context.Context, namespace string, podID s if err != nil { return fmt.Errorf("object does not have ObjectMeta, %v", err) } - klog.V(2).InfoS("Deleting pod", "controller", accessor.GetName(), "pod", klog.KRef(namespace, podID)) + logger := klog.FromContext(ctx) + logger.V(2).Info("Deleting pod", "controller", accessor.GetName(), "pod", klog.KRef(namespace, podID)) if err := r.KubeClient.CoreV1().Pods(namespace).Delete(ctx, podID, metav1.DeleteOptions{}); err != nil { if apierrors.IsNotFound(err) { - klog.V(4).Infof("pod %v/%v has already been deleted.", namespace, podID) + logger.V(4).Info("Pod has already been deleted.", "pod", klog.KRef(namespace, podID)) return err } r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err) @@ -943,14 +946,13 @@ func maxContainerRestarts(pod *v1.Pod) int { } // FilterActivePods returns pods that have not terminated. -func FilterActivePods(pods []*v1.Pod) []*v1.Pod { +func FilterActivePods(logger klog.Logger, pods []*v1.Pod) []*v1.Pod { var result []*v1.Pod for _, p := range pods { if IsPodActive(p) { result = append(result, p) } else { - klog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v", - p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp) + logger.V(4).Info("Ignoring inactive pod", "pod", klog.KObj(p), "phase", p.Status.Phase, "deletionTime", p.DeletionTimestamp) } } return result diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index ce816f5870a..99f23df4dcb 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -51,6 +51,7 @@ import ( "k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/securitycontext" + "k8s.io/kubernetes/test/utils/ktesting" testingclock "k8s.io/utils/clock/testing" "k8s.io/utils/pointer" @@ -170,6 +171,7 @@ func newReplicaSet(name string, replicas int) *apps.ReplicaSet { } func TestControllerExpectations(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) ttl := 30 * time.Second e, fakeClock := NewFakeControllerExpectationsLookup(ttl) // In practice we can't really have add and delete expectations since we only either create or @@ -182,26 +184,26 @@ func TestControllerExpectations(t *testing.T) { rcKey, err := KeyFunc(rc) assert.NoError(t, err, "Couldn't get key for object %#v: %v", rc, err) - e.SetExpectations(rcKey, adds, dels) + e.SetExpectations(logger, rcKey, adds, dels) var wg sync.WaitGroup for i := 0; i < adds+1; i++ { wg.Add(1) go func() { // In prod this can happen either because of a failed create by the rc // or after having observed a create via informer - e.CreationObserved(rcKey) + e.CreationObserved(logger, rcKey) wg.Done() }() } wg.Wait() // There are still delete expectations - assert.False(t, e.SatisfiedExpectations(rcKey), "Rc will sync before expectations are met") + assert.False(t, e.SatisfiedExpectations(logger, rcKey), "Rc will sync before expectations are met") for i := 0; i < dels+1; i++ { wg.Add(1) go func() { - e.DeletionObserved(rcKey) + e.DeletionObserved(logger, rcKey) wg.Done() }() } @@ -215,10 +217,10 @@ func TestControllerExpectations(t *testing.T) { add, del := podExp.GetExpectations() assert.Equal(t, int64(-1), add, "Unexpected pod expectations %#v", podExp) assert.Equal(t, int64(-1), del, "Unexpected pod expectations %#v", podExp) - assert.True(t, e.SatisfiedExpectations(rcKey), "Expectations are met but the rc will not sync") + assert.True(t, e.SatisfiedExpectations(logger, rcKey), "Expectations are met but the rc will not sync") // Next round of rc sync, old expectations are cleared - e.SetExpectations(rcKey, 1, 2) + e.SetExpectations(logger, rcKey, 1, 2) podExp, exists, err = e.GetExpectations(rcKey) assert.NoError(t, err, "Could not get expectations for rc, exists %v and err %v", exists, err) assert.True(t, exists, "Could not get expectations for rc, exists %v and err %v", exists, err) @@ -229,11 +231,12 @@ func TestControllerExpectations(t *testing.T) { // Expectations have expired because of ttl fakeClock.Step(ttl + 1) - assert.True(t, e.SatisfiedExpectations(rcKey), + assert.True(t, e.SatisfiedExpectations(logger, rcKey), "Expectations should have expired but didn't") } func TestUIDExpectations(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) uidExp := NewUIDTrackingControllerExpectations(NewControllerExpectations()) rcList := []*v1.ReplicationController{ newReplicationController(2), @@ -261,24 +264,24 @@ func TestUIDExpectations(t *testing.T) { rcPodNames = append(rcPodNames, PodKey(p)) } rcToPods[rcKey] = rcPodNames - uidExp.ExpectDeletions(rcKey, rcPodNames) + uidExp.ExpectDeletions(logger, rcKey, rcPodNames) } for i := range rcKeys { j := rand.Intn(i + 1) rcKeys[i], rcKeys[j] = rcKeys[j], rcKeys[i] } for _, rcKey := range rcKeys { - assert.False(t, uidExp.SatisfiedExpectations(rcKey), + assert.False(t, uidExp.SatisfiedExpectations(logger, rcKey), "Controller %v satisfied expectations before deletion", rcKey) for _, p := range rcToPods[rcKey] { - uidExp.DeletionObserved(rcKey, p) + uidExp.DeletionObserved(logger, rcKey, p) } - assert.True(t, uidExp.SatisfiedExpectations(rcKey), + assert.True(t, uidExp.SatisfiedExpectations(logger, rcKey), "Controller %v didn't satisfy expectations after deletion", rcKey) - uidExp.DeleteExpectations(rcKey) + uidExp.DeleteExpectations(logger, rcKey) assert.Nil(t, uidExp.GetUIDs(rcKey), "Failed to delete uid expectations for %v", rcKey) @@ -378,6 +381,7 @@ func TestDeletePodsAllowsMissing(t *testing.T) { } func TestActivePodFiltering(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) // This rc is not needed by the test, only the newPodList to give the pods labels/a namespace. rc := newReplicationController(0) podList := newPodList(nil, 5, v1.PodRunning, rc) @@ -392,7 +396,7 @@ func TestActivePodFiltering(t *testing.T) { for i := range podList.Items { podPointers = append(podPointers, &podList.Items[i]) } - got := FilterActivePods(podPointers) + got := FilterActivePods(logger, podPointers) gotNames := sets.NewString() for _, pod := range got { gotNames.Insert(pod.Name) diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index 8802eeb81dc..040251918b9 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -272,7 +272,7 @@ func (dsc *DaemonSetsController) deleteDaemonset(logger klog.Logger, obj interfa } // Delete expectations for the DaemonSet so if we create a new one with the same name it starts clean - dsc.expectations.DeleteExpectations(key) + dsc.expectations.DeleteExpectations(logger, key) dsc.queue.Add(key) } @@ -518,7 +518,7 @@ func (dsc *DaemonSetsController) addPod(logger klog.Logger, obj interface{}) { return } logger.V(4).Info("Pod added", "pod", klog.KObj(pod)) - dsc.expectations.CreationObserved(dsKey) + dsc.expectations.CreationObserved(logger, dsKey) dsc.enqueueDaemonSet(ds) return } @@ -635,7 +635,7 @@ func (dsc *DaemonSetsController) deletePod(logger klog.Logger, obj interface{}) return } logger.V(4).Info("Pod deleted", "pod", klog.KObj(pod)) - dsc.expectations.DeletionObserved(dsKey) + dsc.expectations.DeletionObserved(logger, dsKey) dsc.enqueueDaemonSet(ds) } @@ -934,7 +934,7 @@ func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.D } // Process rolling updates if we're ready. - if dsc.expectations.SatisfiedExpectations(key) { + if dsc.expectations.SatisfiedExpectations(klog.FromContext(ctx), key) { switch ds.Spec.UpdateStrategy.Type { case apps.OnDeleteDaemonSetStrategyType: case apps.RollingUpdateDaemonSetStrategyType: @@ -1008,7 +1008,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS deleteDiff = dsc.burstReplicas } - dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff) + dsc.expectations.SetExpectations(logger, dsKey, createDiff, deleteDiff) // error channel to communicate back failures. make the buffer big enough to avoid any blocking errCh := make(chan error, createDiff+deleteDiff) @@ -1057,7 +1057,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS } if err != nil { logger.V(2).Info("Failed creation, decrementing expectations for daemon set", "daemonset", klog.KObj(ds)) - dsc.expectations.CreationObserved(dsKey) + dsc.expectations.CreationObserved(logger, dsKey) errCh <- err utilruntime.HandleError(err) } @@ -1068,7 +1068,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS skippedPods := createDiff - (batchSize + pos) if errorCount < len(errCh) && skippedPods > 0 { logger.V(2).Info("Slow-start failure. Skipping creation pods, decrementing expectations for daemon set", "skippedPods", skippedPods, "daemonset", klog.KObj(ds)) - dsc.expectations.LowerExpectations(dsKey, skippedPods, 0) + dsc.expectations.LowerExpectations(logger, dsKey, skippedPods, 0) // The skipped pods will be retried later. The next controller resync will // retry the slow start process. break @@ -1082,7 +1082,7 @@ func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonS go func(ix int) { defer deleteWait.Done() if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil { - dsc.expectations.DeletionObserved(dsKey) + dsc.expectations.DeletionObserved(logger, dsKey) if !apierrors.IsNotFound(err) { logger.V(2).Info("Failed deletion, decremented expectations for daemon set", "daemonset", klog.KObj(ds)) errCh <- err @@ -1232,7 +1232,7 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) ds, err := dsc.dsLister.DaemonSets(namespace).Get(name) if apierrors.IsNotFound(err) { logger.V(3).Info("Daemon set has been deleted", "daemonset", key) - dsc.expectations.DeleteExpectations(key) + dsc.expectations.DeleteExpectations(logger, key) return nil } if err != nil { @@ -1277,7 +1277,7 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) } hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey] - if !dsc.expectations.SatisfiedExpectations(dsKey) { + if !dsc.expectations.SatisfiedExpectations(logger, dsKey) { // Only update status. Don't raise observedGeneration since controller didn't process object of that generation. return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false) } diff --git a/pkg/controller/daemon/daemon_controller_test.go b/pkg/controller/daemon/daemon_controller_test.go index bfaff5b6316..965df59edeb 100644 --- a/pkg/controller/daemon/daemon_controller_test.go +++ b/pkg/controller/daemon/daemon_controller_test.go @@ -44,6 +44,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" podutil "k8s.io/kubernetes/pkg/api/v1/pod" api "k8s.io/kubernetes/pkg/apis/core" @@ -275,7 +276,7 @@ func (f *fakePodControl) CreatePods(ctx context.Context, namespace string, templ ds := object.(*apps.DaemonSet) dsKey, _ := controller.KeyFunc(ds) - f.expectations.CreationObserved(dsKey) + f.expectations.CreationObserved(klog.FromContext(ctx), dsKey) return nil } @@ -295,7 +296,7 @@ func (f *fakePodControl) DeletePod(ctx context.Context, namespace string, podID ds := object.(*apps.DaemonSet) dsKey, _ := controller.KeyFunc(ds) - f.expectations.DeletionObserved(dsKey) + f.expectations.DeletionObserved(klog.FromContext(ctx), dsKey) return nil } @@ -424,7 +425,7 @@ func clearExpectations(t *testing.T, manager *daemonSetsController, ds *apps.Dae t.Errorf("Could not get key for daemon.") return } - manager.expectations.DeleteExpectations(key) + manager.expectations.DeleteExpectations(logger, key) now := manager.failedPodsBackoff.Clock.Now() hash, _ := currentDSHash(manager, ds) @@ -836,6 +837,7 @@ func TestSimpleDaemonSetPodCreateErrors(t *testing.T) { } func TestDaemonSetPodCreateExpectationsError(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) strategies := updateStrategies() for _, strategy := range strategies { ds := newDaemonSet("foo") @@ -860,7 +862,7 @@ func TestDaemonSetPodCreateExpectationsError(t *testing.T) { t.Fatalf("error get DaemonSets controller key: %v", err) } - if !manager.expectations.SatisfiedExpectations(dsKey) { + if !manager.expectations.SatisfiedExpectations(logger, dsKey) { t.Errorf("Unsatisfied pod creation expectations. Expected %d", creationExpectations) } } diff --git a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go index efd1c994c16..8f408c4f678 100644 --- a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go +++ b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go @@ -67,13 +67,14 @@ const ( ) // NewController creates and initializes a new Controller -func NewController(endpointsInformer coreinformers.EndpointsInformer, +func NewController(ctx context.Context, endpointsInformer coreinformers.EndpointsInformer, endpointSliceInformer discoveryinformers.EndpointSliceInformer, serviceInformer coreinformers.ServiceInformer, maxEndpointsPerSubset int32, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration, ) *Controller { + logger := klog.FromContext(ctx) broadcaster := record.NewBroadcaster() recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-mirroring-controller"}) @@ -96,16 +97,24 @@ func NewController(endpointsInformer coreinformers.EndpointsInformer, } endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.onEndpointsAdd, - UpdateFunc: c.onEndpointsUpdate, - DeleteFunc: c.onEndpointsDelete, + AddFunc: func(obj interface{}) { + c.onEndpointsAdd(logger, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + c.onEndpointsUpdate(logger, oldObj, newObj) + }, + DeleteFunc: func(obj interface{}) { + c.onEndpointsDelete(logger, obj) + }, }) c.endpointsLister = endpointsInformer.Lister() c.endpointsSynced = endpointsInformer.Informer().HasSynced endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.onEndpointSliceAdd, - UpdateFunc: c.onEndpointSliceUpdate, + AddFunc: c.onEndpointSliceAdd, + UpdateFunc: func(oldObj, newObj interface{}) { + c.onEndpointSliceUpdate(logger, oldObj, newObj) + }, DeleteFunc: c.onEndpointSliceDelete, }) @@ -393,21 +402,21 @@ func (c *Controller) onServiceDelete(obj interface{}) { } // onEndpointsAdd queues a sync for the relevant Endpoints resource. -func (c *Controller) onEndpointsAdd(obj interface{}) { +func (c *Controller) onEndpointsAdd(logger klog.Logger, obj interface{}) { endpoints := obj.(*v1.Endpoints) if endpoints == nil { utilruntime.HandleError(fmt.Errorf("onEndpointsAdd() expected type v1.Endpoints, got %T", obj)) return } if !c.shouldMirror(endpoints) { - klog.V(5).Infof("Skipping mirroring for %s/%s", endpoints.Namespace, endpoints.Name) + logger.V(5).Info("Skipping mirroring", "endpoints", klog.KObj(endpoints)) return } c.queueEndpoints(obj) } // onEndpointsUpdate queues a sync for the relevant Endpoints resource. -func (c *Controller) onEndpointsUpdate(prevObj, obj interface{}) { +func (c *Controller) onEndpointsUpdate(logger klog.Logger, prevObj, obj interface{}) { endpoints := obj.(*v1.Endpoints) prevEndpoints := prevObj.(*v1.Endpoints) if endpoints == nil || prevEndpoints == nil { @@ -415,21 +424,21 @@ func (c *Controller) onEndpointsUpdate(prevObj, obj interface{}) { return } if !c.shouldMirror(endpoints) && !c.shouldMirror(prevEndpoints) { - klog.V(5).Infof("Skipping mirroring for %s/%s", endpoints.Namespace, endpoints.Name) + logger.V(5).Info("Skipping mirroring", "endpoints", klog.KObj(endpoints)) return } c.queueEndpoints(obj) } // onEndpointsDelete queues a sync for the relevant Endpoints resource. -func (c *Controller) onEndpointsDelete(obj interface{}) { +func (c *Controller) onEndpointsDelete(logger klog.Logger, obj interface{}) { endpoints := getEndpointsFromDeleteAction(obj) if endpoints == nil { utilruntime.HandleError(fmt.Errorf("onEndpointsDelete() expected type v1.Endpoints, got %T", obj)) return } if !c.shouldMirror(endpoints) { - klog.V(5).Infof("Skipping mirroring for %s/%s", endpoints.Namespace, endpoints.Name) + logger.V(5).Info("Skipping mirroring", "endpoints", klog.KObj(endpoints)) return } c.queueEndpoints(obj) @@ -453,7 +462,7 @@ func (c *Controller) onEndpointSliceAdd(obj interface{}) { // sync if the EndpointSlice resource version does not match the expected // version in the endpointSliceTracker or the managed-by value of the // EndpointSlice has changed from or to this controller. -func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) { +func (c *Controller) onEndpointSliceUpdate(logger klog.Logger, prevObj, obj interface{}) { prevEndpointSlice := obj.(*discovery.EndpointSlice) endpointSlice := prevObj.(*discovery.EndpointSlice) if endpointSlice == nil || prevEndpointSlice == nil { @@ -466,7 +475,7 @@ func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) { svcName := endpointSlice.Labels[discovery.LabelServiceName] prevSvcName := prevEndpointSlice.Labels[discovery.LabelServiceName] if svcName != prevSvcName { - klog.Warningf("%s label changed from %s to %s for %s", discovery.LabelServiceName, prevSvcName, svcName, endpointSlice.Name) + logger.Info("LabelServiceName changed", "labelServiceName", discovery.LabelServiceName, "oldName", prevSvcName, "newName", svcName, "endpointSlice", klog.KObj(endpointSlice)) c.queueEndpointsForEndpointSlice(endpointSlice) c.queueEndpointsForEndpointSlice(prevEndpointSlice) return diff --git a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go index 7024e4e891b..75dfa155752 100644 --- a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go +++ b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go @@ -49,11 +49,12 @@ type endpointSliceMirroringController struct { serviceStore cache.Store } -func newController(batchPeriod time.Duration) (*fake.Clientset, *endpointSliceMirroringController) { +func newController(ctx context.Context, batchPeriod time.Duration) (*fake.Clientset, *endpointSliceMirroringController) { client := newClientset() informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) esController := NewController( + ctx, informerFactory.Core().V1().Endpoints(), informerFactory.Discovery().V1().EndpointSlices(), informerFactory.Core().V1().Services(), @@ -224,7 +225,8 @@ func TestSyncEndpoints(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { - client, esController := newController(time.Duration(0)) + _, ctx := ktesting.NewTestContext(t) + client, esController := newController(ctx, time.Duration(0)) tc.endpoints.Name = endpointsName tc.endpoints.Namespace = namespace esController.endpointsStore.Add(tc.endpoints) @@ -320,7 +322,8 @@ func TestShouldMirror(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { - _, c := newController(time.Duration(0)) + _, ctx := ktesting.NewTestContext(t) + _, c := newController(ctx, time.Duration(0)) if tc.endpoints != nil { err := c.endpointsStore.Add(tc.endpoints) @@ -437,7 +440,8 @@ func TestEndpointSlicesMirroredForService(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { - _, c := newController(time.Duration(0)) + _, ctx := ktesting.NewTestContext(t) + _, c := newController(ctx, time.Duration(0)) err := c.endpointSliceStore.Add(tc.endpointSlice) if err != nil { diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 78d2b13408f..057e6939814 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -296,7 +296,7 @@ func (jm *Controller) addPod(logger klog.Logger, obj interface{}) { if err != nil { return } - jm.expectations.CreationObserved(jobKey) + jm.expectations.CreationObserved(logger, jobKey) jm.enqueueSyncJobBatched(logger, job) return } @@ -436,7 +436,7 @@ func (jm *Controller) deletePod(logger klog.Logger, obj interface{}, final bool) if err != nil { return } - jm.expectations.DeletionObserved(jobKey) + jm.expectations.DeletionObserved(logger, jobKey) // Consider the finalizer removed if this is the final delete. Otherwise, // it's an update for the deletion timestamp, then check finalizer. @@ -725,7 +725,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { if err != nil { if apierrors.IsNotFound(err) { logger.V(4).Info("Job has been deleted", "key", key) - jm.expectations.DeleteExpectations(key) + jm.expectations.DeleteExpectations(logger, key) jm.finalizerExpectations.deleteExpectations(logger, key) err := jm.podBackoffStore.removeBackoffRecord(key) @@ -775,7 +775,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // and update the expectations after we've retrieved active pods from the store. If a new pod enters // the store after we've checked the expectation, the job sync is just deferred till the next relist. - satisfiedExpectations := jm.expectations.SatisfiedExpectations(key) + satisfiedExpectations := jm.expectations.SatisfiedExpectations(logger, key) pods, err := jm.getPodsForJob(ctx, &job) if err != nil { @@ -785,7 +785,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { jobCtx := &syncJobCtx{ job: &job, pods: pods, - activePods: controller.FilterActivePods(pods), + activePods: controller.FilterActivePods(logger, pods), uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods), expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key), } @@ -947,7 +947,7 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey failDelete := func(pod *v1.Pod, err error) { // Decrement the expected number of deletes because the informer won't observe this deletion - jm.expectations.DeletionObserved(jobKey) + jm.expectations.DeletionObserved(logger, jobKey) if !apierrors.IsNotFound(err) { logger.V(2).Info("Failed to delete Pod", "job", klog.KObj(job), "pod", klog.KObj(pod), "err", err) atomic.AddInt32(&successfulDeletes, -1) @@ -1394,7 +1394,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn if jobSuspended(job) { logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active) podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(active)) - jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) + jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete)) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) active -= removed return active, metrics.JobSyncActionPodsDeleted, err @@ -1431,7 +1431,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync] } if len(podsToDelete) > 0 { - jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) + jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete)) logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) active -= removed @@ -1453,7 +1453,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn diff = int32(MaxPodCreateDeletePerSync) } - jm.expectations.ExpectCreations(jobKey, int(diff)) + jm.expectations.ExpectCreations(logger, jobKey, int(diff)) errCh := make(chan error, diff) logger.V(4).Info("Too few pods running", "key", jobKey, "need", wantActive, "creating", diff) @@ -1511,7 +1511,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn defer utilruntime.HandleError(err) // Decrement the expected number of creates because the informer won't observe this pod logger.V(2).Info("Failed creation, decrementing expectations", "job", klog.KObj(job)) - jm.expectations.CreationObserved(jobKey) + jm.expectations.CreationObserved(logger, jobKey) atomic.AddInt32(&active, -1) errCh <- err } @@ -1525,7 +1525,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn active -= skippedPods for i := int32(0); i < skippedPods; i++ { // Decrement the expected number of creates because the informer won't observe this pod - jm.expectations.CreationObserved(jobKey) + jm.expectations.CreationObserved(logger, jobKey) } // The skipped pods will be retried later. The next controller resync will // retry the slow start process. diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 99ac30eae60..4095a0c33e0 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -785,6 +785,7 @@ func TestControllerSyncJob(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)() // job manager setup @@ -828,9 +829,9 @@ func TestControllerSyncJob(t *testing.T) { manager.podBackoffStore.updateBackoffRecord(*tc.backoffRecord) } if tc.fakeExpectationAtCreation < 0 { - manager.expectations.ExpectDeletions(key, int(-tc.fakeExpectationAtCreation)) + manager.expectations.ExpectDeletions(logger, key, int(-tc.fakeExpectationAtCreation)) } else if tc.fakeExpectationAtCreation > 0 { - manager.expectations.ExpectCreations(key, int(tc.fakeExpectationAtCreation)) + manager.expectations.ExpectCreations(logger, key, int(tc.fakeExpectationAtCreation)) } if tc.wasSuspended { job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended", realClock.Now())) @@ -3673,7 +3674,7 @@ type FakeJobExpectations struct { expSatisfied func() } -func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool { +func (fe FakeJobExpectations) SatisfiedExpectations(logger klog.Logger, controllerKey string) bool { fe.expSatisfied() return fe.satisfied } diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index cc129b7d94e..1307d1b2903 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -210,8 +210,9 @@ func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) { defer rsc.queue.ShutDown() controllerName := strings.ToLower(rsc.Kind) - klog.FromContext(ctx).Info("Starting controller", "name", controllerName) - defer klog.FromContext(ctx).Info("Shutting down controller", "name", controllerName) + logger := klog.FromContext(ctx) + logger.Info("Starting controller", "name", controllerName) + defer logger.Info("Shutting down controller", "name", controllerName) if !cache.WaitForNamedCacheSync(rsc.Kind, ctx.Done(), rsc.podListerSynced, rsc.rsListerSynced) { return @@ -371,7 +372,7 @@ func (rsc *ReplicaSetController) deleteRS(logger klog.Logger, obj interface{}) { logger.V(4).Info("Deleting", "replicaSet", klog.KObj(rs)) // Delete expectations for the ReplicaSet so if we create a new one with the same name it starts clean - rsc.expectations.DeleteExpectations(key) + rsc.expectations.DeleteExpectations(logger, key) rsc.queue.Add(key) } @@ -398,7 +399,7 @@ func (rsc *ReplicaSetController) addPod(logger klog.Logger, obj interface{}) { return } logger.V(4).Info("Pod created", "pod", klog.KObj(pod), "detail", pod) - rsc.expectations.CreationObserved(rsKey) + rsc.expectations.CreationObserved(logger, rsKey) rsc.queue.Add(rsKey) return } @@ -529,7 +530,7 @@ func (rsc *ReplicaSetController) deletePod(logger klog.Logger, obj interface{}) return } logger.V(4).Info("Pod deleted", "delete_by", utilruntime.GetCaller(), "deletion_timestamp", pod.DeletionTimestamp, "pod", klog.KObj(pod)) - rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod)) + rsc.expectations.DeletionObserved(logger, rsKey, controller.PodKey(pod)) rsc.queue.Add(rsKey) } @@ -569,6 +570,7 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err)) return nil } + logger := klog.FromContext(ctx) if diff < 0 { diff *= -1 if diff > rsc.burstReplicas { @@ -579,8 +581,8 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod // UID, which would require locking *across* the create, which will turn // into a performance bottleneck. We should generate a UID for the pod // beforehand and store it via ExpectCreations. - rsc.expectations.ExpectCreations(rsKey, diff) - klog.FromContext(ctx).V(2).Info("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff) + rsc.expectations.ExpectCreations(logger, rsKey, diff) + logger.V(2).Info("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff) // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // and double with each successful iteration in a kind of "slow start". // This handles attempts to start large numbers of pods that would @@ -605,10 +607,10 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod // The skipped pods will be retried later. The next controller resync will // retry the slow start process. if skippedPods := diff - successfulCreations; skippedPods > 0 { - klog.FromContext(ctx).V(2).Info("Slow-start failure. Skipping creation of pods, decrementing expectations", "podsSkipped", skippedPods, "kind", rsc.Kind, "replicaSet", klog.KObj(rs)) + logger.V(2).Info("Slow-start failure. Skipping creation of pods, decrementing expectations", "podsSkipped", skippedPods, "kind", rsc.Kind, "replicaSet", klog.KObj(rs)) for i := 0; i < skippedPods; i++ { // Decrement the expected number of creates because the informer won't observe this pod - rsc.expectations.CreationObserved(rsKey) + rsc.expectations.CreationObserved(logger, rsKey) } } return err @@ -616,9 +618,9 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod if diff > rsc.burstReplicas { diff = rsc.burstReplicas } - klog.FromContext(ctx).V(2).Info("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff) + logger.V(2).Info("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff) - relatedPods, err := rsc.getIndirectlyRelatedPods(klog.FromContext(ctx), rs) + relatedPods, err := rsc.getIndirectlyRelatedPods(logger, rs) utilruntime.HandleError(err) // Choose which Pods to delete, preferring those in earlier phases of startup. @@ -630,7 +632,7 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod // Note that if the labels on a pod/rs change in a way that the pod gets // orphaned, the rs will only wake up after the expectations have // expired even if other pods are deleted. - rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete)) + rsc.expectations.ExpectDeletions(logger, rsKey, getPodKeys(podsToDelete)) errCh := make(chan error, diff) var wg sync.WaitGroup @@ -641,9 +643,9 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion podKey := controller.PodKey(targetPod) - rsc.expectations.DeletionObserved(rsKey, podKey) + rsc.expectations.DeletionObserved(logger, rsKey, podKey) if !apierrors.IsNotFound(err) { - klog.FromContext(ctx).V(2).Info("Failed to delete pod, decremented expectations", "pod", podKey, "kind", rsc.Kind, "replicaSet", klog.KObj(rs)) + logger.V(2).Info("Failed to delete pod, decremented expectations", "pod", podKey, "kind", rsc.Kind, "replicaSet", klog.KObj(rs)) errCh <- err } } @@ -668,9 +670,10 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be // invoked concurrently with the same key. func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error { + logger := klog.FromContext(ctx) startTime := time.Now() defer func() { - klog.FromContext(ctx).V(4).Info("Finished syncing", "kind", rsc.Kind, "key", key, "duration", time.Since(startTime)) + logger.Info("Finished syncing", "kind", rsc.Kind, "key", key, "duration", time.Since(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -679,15 +682,15 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) } rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name) if apierrors.IsNotFound(err) { - klog.FromContext(ctx).V(4).Info("deleted", "kind", rsc.Kind, "key", key) - rsc.expectations.DeleteExpectations(key) + logger.V(4).Info("deleted", "kind", rsc.Kind, "key", key) + rsc.expectations.DeleteExpectations(logger, key) return nil } if err != nil { return err } - rsNeedsSync := rsc.expectations.SatisfiedExpectations(key) + rsNeedsSync := rsc.expectations.SatisfiedExpectations(logger, key) selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) if err != nil { utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector for rs %v/%v: %v", namespace, name, err)) @@ -702,7 +705,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) return err } // Ignore inactive pods. - filteredPods := controller.FilterActivePods(allPods) + filteredPods := controller.FilterActivePods(logger, allPods) // NOTE: filteredPods are pointing to objects from cache - if you need to // modify them, you need to copy it first. @@ -719,7 +722,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) newStatus := calculateStatus(rs, filteredPods, manageReplicasErr) // Always updates status as pods come up or die. - updatedRS, err := updateReplicaSetStatus(klog.FromContext(ctx), rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus) + updatedRS, err := updateReplicaSetStatus(logger, rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus) if err != nil { // Multiple things could lead to this update failing. Requeuing the replica set ensures // Returning an error causes a requeue without forcing a hotloop diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index b6fc90c57c4..12d4dfaf03f 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -49,6 +49,7 @@ import ( "k8s.io/client-go/tools/cache" utiltesting "k8s.io/client-go/util/testing" "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/controller" . "k8s.io/kubernetes/pkg/controller/testutil" @@ -305,7 +306,7 @@ func TestSyncReplicaSetCreateFailures(t *testing.T) { func TestSyncReplicaSetDormancy(t *testing.T) { // Setup a test server so we can lie about the current state of pods - _, ctx := ktesting.NewTestContext(t) + logger, ctx := ktesting.NewTestContext(t) fakeHandler := utiltesting.FakeHandler{ StatusCode: 200, ResponseBody: "{}", @@ -357,7 +358,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { // Lowering expectations should lead to a sync that creates a replica, however the // fakePodControl error will prevent this, leaving expectations at 0, 0 - manager.expectations.CreationObserved(rsKey) + manager.expectations.CreationObserved(logger, rsKey) rsSpec.Status.Replicas = 1 rsSpec.Status.ReadyReplicas = 1 rsSpec.Status.AvailableReplicas = 1 @@ -1094,7 +1095,7 @@ type FakeRSExpectations struct { expSatisfied func() } -func (fe FakeRSExpectations) SatisfiedExpectations(controllerKey string) bool { +func (fe FakeRSExpectations) SatisfiedExpectations(logger klog.Logger, controllerKey string) bool { fe.expSatisfied() return fe.satisfied } @@ -1410,7 +1411,7 @@ func TestDeletionTimestamp(t *testing.T) { pod := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod").Items[0] pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} pod.ResourceVersion = "1" - manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)}) + manager.expectations.ExpectDeletions(logger, rsKey, []string{controller.PodKey(&pod)}) // A pod added with a deletion timestamp should decrement deletions, not creations. manager.addPod(logger, &pod) @@ -1430,7 +1431,7 @@ func TestDeletionTimestamp(t *testing.T) { // as a deletion. oldPod := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod").Items[0] oldPod.ResourceVersion = "2" - manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)}) + manager.expectations.ExpectDeletions(logger, rsKey, []string{controller.PodKey(&pod)}) manager.updatePod(logger, &oldPod, &pod) queueRS, _ = manager.queue.Get() @@ -1457,7 +1458,7 @@ func TestDeletionTimestamp(t *testing.T) { }, }, } - manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(secondPod)}) + manager.expectations.ExpectDeletions(logger, rsKey, []string{controller.PodKey(secondPod)}) oldPod.DeletionTimestamp = &metav1.Time{Time: time.Now()} oldPod.ResourceVersion = "2" manager.updatePod(logger, &oldPod, &pod) diff --git a/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go b/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go index f131d26c04f..a75c752eaa9 100644 --- a/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go +++ b/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go @@ -34,7 +34,6 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" - "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/controller" volumeutil "k8s.io/kubernetes/pkg/volume/util" @@ -445,7 +444,7 @@ func TestPVCProtectionController(t *testing.T) { break } if ctrl.queue.Len() > 0 { - klog.V(5).Infof("Test %q: %d events queue, processing one", test.name, ctrl.queue.Len()) + logger.V(5).Info("Non-empty queue, processing one", "test", test.name, "queueLength", ctrl.queue.Len()) ctrl.processNextWorkItem(context.TODO()) } if ctrl.queue.Len() > 0 { @@ -456,7 +455,7 @@ func TestPVCProtectionController(t *testing.T) { if currentActionCount < len(test.expectedActions) { // Do not log every wait, only when the action count changes. if lastReportedActionCount < currentActionCount { - klog.V(5).Infof("Test %q: got %d actions out of %d, waiting for the rest", test.name, currentActionCount, len(test.expectedActions)) + logger.V(5).Info("Waiting for the remaining actions", "test", test.name, "currentActionCount", currentActionCount, "expectedActionCount", len(test.expectedActions)) lastReportedActionCount = currentActionCount } // The test expected more to happen, wait for the actions. diff --git a/pkg/controller/volume/pvprotection/pv_protection_controller_test.go b/pkg/controller/volume/pvprotection/pv_protection_controller_test.go index 2cc876f044d..310f5fecd70 100644 --- a/pkg/controller/volume/pvprotection/pv_protection_controller_test.go +++ b/pkg/controller/volume/pvprotection/pv_protection_controller_test.go @@ -33,7 +33,6 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" - "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/controller" volumeutil "k8s.io/kubernetes/pkg/volume/util" @@ -227,7 +226,7 @@ func TestPVProtectionController(t *testing.T) { break } if ctrl.queue.Len() > 0 { - klog.V(5).Infof("Test %q: %d events queue, processing one", test.name, ctrl.queue.Len()) + logger.V(5).Info("Non-empty events queue, processing one", "test", test.name, "queueLength", ctrl.queue.Len()) ctrl.processNextWorkItem(context.TODO()) } if ctrl.queue.Len() > 0 { @@ -238,7 +237,7 @@ func TestPVProtectionController(t *testing.T) { if currentActionCount < len(test.expectedActions) { // Do not log evey wait, only when the action count changes. if lastReportedActionCount < currentActionCount { - klog.V(5).Infof("Test %q: got %d actions out of %d, waiting for the rest", test.name, currentActionCount, len(test.expectedActions)) + logger.V(5).Info("Waiting for the remaining actions", "test", test.name, "currentActionCount", currentActionCount, "expectedActionCount", len(test.expectedActions)) lastReportedActionCount = currentActionCount } // The test expected more to happen, wait for the actions. diff --git a/test/integration/endpointslice/endpointslicemirroring_test.go b/test/integration/endpointslice/endpointslicemirroring_test.go index fc7f74f3d8c..3f7543f62f3 100644 --- a/test/integration/endpointslice/endpointslicemirroring_test.go +++ b/test/integration/endpointslice/endpointslicemirroring_test.go @@ -70,6 +70,7 @@ func TestEndpointSliceMirroring(t *testing.T) { 1*time.Second) epsmController := endpointslicemirroring.NewController( + ctx, informers.Core().V1().Endpoints(), informers.Discovery().V1().EndpointSlices(), informers.Core().V1().Services(), @@ -311,6 +312,7 @@ func TestEndpointSliceMirroring(t *testing.T) { } func TestEndpointSliceMirroringUpdates(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) defer server.TearDownFn() @@ -324,6 +326,7 @@ func TestEndpointSliceMirroringUpdates(t *testing.T) { informers := informers.NewSharedInformerFactory(client, resyncPeriod) epsmController := endpointslicemirroring.NewController( + ctx, informers.Core().V1().Endpoints(), informers.Discovery().V1().EndpointSlices(), informers.Core().V1().Services(), @@ -332,7 +335,6 @@ func TestEndpointSliceMirroringUpdates(t *testing.T) { 1*time.Second) // Start informer and controllers - _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() informers.Start(ctx.Done()) @@ -487,6 +489,7 @@ func TestEndpointSliceMirroringUpdates(t *testing.T) { } func TestEndpointSliceMirroringSelectorTransition(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) defer server.TearDownFn() @@ -500,6 +503,7 @@ func TestEndpointSliceMirroringSelectorTransition(t *testing.T) { informers := informers.NewSharedInformerFactory(client, resyncPeriod) epsmController := endpointslicemirroring.NewController( + ctx, informers.Core().V1().Endpoints(), informers.Discovery().V1().EndpointSlices(), informers.Core().V1().Services(), @@ -508,7 +512,6 @@ func TestEndpointSliceMirroringSelectorTransition(t *testing.T) { 1*time.Second) // Start informer and controllers - _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() informers.Start(ctx.Done())