Migrated pkg/controller/replicaset to contextual logging (#114871)

* migrated controller/replicaset to contextual logging

Signed-off-by: Naman <namanlakhwani@gmail.com>

* small nits

Signed-off-by: Naman <namanlakhwani@gmail.com>

* addressed changes

Signed-off-by: Naman <namanlakhwani@gmail.com>

* small nit

Signed-off-by: Naman <namanlakhwani@gmail.com>

* taking t as input

Signed-off-by: Naman <namanlakhwani@gmail.com>

---------

Signed-off-by: Naman <namanlakhwani@gmail.com>
This commit is contained in:
Naman Lakhwani 2023-03-07 17:49:51 +05:30 committed by GitHub
parent 4aaa4df840
commit 8f45b64c93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 74 additions and 56 deletions

View File

@ -26,6 +26,7 @@ import (
"k8s.io/client-go/util/flowcontrol"
"k8s.io/controller-manager/controller"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment"
"k8s.io/kubernetes/pkg/controller/replicaset"
@ -61,6 +62,7 @@ func startStatefulSetController(ctx context.Context, controllerContext Controlle
func startReplicaSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go replicaset.NewReplicaSetController(
klog.FromContext(ctx),
controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"),

View File

@ -116,10 +116,10 @@ type ReplicaSetController struct {
}
// NewReplicaSetController configures a replica set controller with the specified event recorder
func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
func NewReplicaSetController(logger klog.Logger, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
eventBroadcaster := record.NewBroadcaster()
if err := metrics.Register(legacyregistry.Register); err != nil {
klog.ErrorS(err, "unable to register metrics")
logger.Error(err, "unable to register metrics")
}
return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
apps.SchemeGroupVersion.WithKind("ReplicaSet"),
@ -198,8 +198,8 @@ func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) {
defer rsc.queue.ShutDown()
controllerName := strings.ToLower(rsc.Kind)
klog.Infof("Starting %v controller", controllerName)
defer klog.Infof("Shutting down %v controller", controllerName)
klog.FromContext(ctx).Info("Starting controller", "name", controllerName)
defer klog.FromContext(ctx).Info("Shutting down controller", "name", controllerName)
if !cache.WaitForNamedCacheSync(rsc.Kind, ctx.Done(), rsc.podListerSynced, rsc.rsListerSynced) {
return
@ -214,7 +214,7 @@ func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) {
// getReplicaSetsWithSameController returns a list of ReplicaSets with the same
// owner as the given ReplicaSet.
func (rsc *ReplicaSetController) getReplicaSetsWithSameController(rs *apps.ReplicaSet) []*apps.ReplicaSet {
func (rsc *ReplicaSetController) getReplicaSetsWithSameController(logger klog.Logger, rs *apps.ReplicaSet) []*apps.ReplicaSet {
controllerRef := metav1.GetControllerOf(rs)
if controllerRef == nil {
utilruntime.HandleError(fmt.Errorf("ReplicaSet has no controller: %v", rs))
@ -231,8 +231,8 @@ func (rsc *ReplicaSetController) getReplicaSetsWithSameController(rs *apps.Repli
relatedRSs = append(relatedRSs, obj.(*apps.ReplicaSet))
}
if klogV := klog.V(2); klogV.Enabled() {
klogV.InfoS("Found related ReplicaSets", "replicaSet", klog.KObj(rs), "relatedReplicaSets", klog.KObjSlice(relatedRSs))
if klogV := logger.V(2); klogV.Enabled() {
klogV.Info("Found related ReplicaSets", "replicaSet", klog.KObj(rs), "relatedReplicaSets", klog.KObjSlice(relatedRSs))
}
return relatedRSs
@ -568,7 +568,7 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod
// into a performance bottleneck. We should generate a UID for the pod
// beforehand and store it via ExpectCreations.
rsc.expectations.ExpectCreations(rsKey, diff)
klog.V(2).InfoS("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)
klog.FromContext(ctx).V(2).Info("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
@ -593,7 +593,7 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
if skippedPods := diff - successfulCreations; skippedPods > 0 {
klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
klog.FromContext(ctx).V(2).Info("Slow-start failure. Skipping creation of pods, decrementing expectations", "podsSkipped", skippedPods, "kind", rsc.Kind, "replicaSet", klog.KObj(rs))
for i := 0; i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
rsc.expectations.CreationObserved(rsKey)
@ -604,9 +604,9 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
klog.V(2).InfoS("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)
klog.FromContext(ctx).V(2).Info("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)
relatedPods, err := rsc.getIndirectlyRelatedPods(rs)
relatedPods, err := rsc.getIndirectlyRelatedPods(klog.FromContext(ctx), rs)
utilruntime.HandleError(err)
// Choose which Pods to delete, preferring those in earlier phases of startup.
@ -631,7 +631,7 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod
podKey := controller.PodKey(targetPod)
rsc.expectations.DeletionObserved(rsKey, podKey)
if !apierrors.IsNotFound(err) {
klog.V(2).Infof("Failed to delete %v, decremented expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
klog.FromContext(ctx).V(2).Info("Failed to delete pod, decremented expectations", "pod", podKey, "kind", rsc.Kind, "replicaSet", klog.KObj(rs))
errCh <- err
}
}
@ -658,7 +658,7 @@ func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPod
func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
klog.FromContext(ctx).V(4).Info("Finished syncing", "kind", rsc.Kind, "key", key, "duration", time.Since(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
@ -667,7 +667,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string)
}
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if apierrors.IsNotFound(err) {
klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
klog.FromContext(ctx).V(4).Info("deleted", "kind", rsc.Kind, "key", key)
rsc.expectations.DeleteExpectations(key)
return nil
}
@ -707,7 +707,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string)
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
// Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
updatedRS, err := updateReplicaSetStatus(klog.FromContext(ctx), rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
// Multiple things could lead to this update failing. Requeuing the replica set ensures
// Returning an error causes a requeue without forcing a hotloop
@ -778,10 +778,10 @@ func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, erro
// getIndirectlyRelatedPods returns all pods that are owned by any ReplicaSet
// that is owned by the given ReplicaSet's owner.
func (rsc *ReplicaSetController) getIndirectlyRelatedPods(rs *apps.ReplicaSet) ([]*v1.Pod, error) {
func (rsc *ReplicaSetController) getIndirectlyRelatedPods(logger klog.Logger, rs *apps.ReplicaSet) ([]*v1.Pod, error) {
var relatedPods []*v1.Pod
seen := make(map[types.UID]*apps.ReplicaSet)
for _, relatedRS := range rsc.getReplicaSetsWithSameController(rs) {
for _, relatedRS := range rsc.getReplicaSetsWithSameController(logger, rs) {
selector, err := metav1.LabelSelectorAsSelector(relatedRS.Spec.Selector)
if err != nil {
// This object has an invalid selector, it does not match any pods
@ -793,14 +793,14 @@ func (rsc *ReplicaSetController) getIndirectlyRelatedPods(rs *apps.ReplicaSet) (
}
for _, pod := range pods {
if otherRS, found := seen[pod.UID]; found {
klog.V(5).Infof("Pod %s/%s is owned by both %v %s/%s and %v %s/%s", pod.Namespace, pod.Name, rsc.Kind, otherRS.Namespace, otherRS.Name, rsc.Kind, relatedRS.Namespace, relatedRS.Name)
logger.V(5).Info("Pod is owned by both", "pod", klog.KObj(pod), "kind", rsc.Kind, "replicaSets", klog.KObjSlice([]klog.KMetadata{otherRS, relatedRS}))
continue
}
seen[pod.UID] = relatedRS
relatedPods = append(relatedPods, pod)
}
}
klog.V(4).InfoS("Found related pods", "kind", rsc.Kind, "object", klog.KObj(rs), "pods", klog.KObjSlice(relatedPods))
logger.V(4).Info("Found related pods", "kind", rsc.Kind, "replicaSet", klog.KObj(rs), "pods", klog.KObjSlice(relatedPods))
return relatedPods, nil
}

View File

@ -49,7 +49,7 @@ import (
"k8s.io/client-go/tools/cache"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/controller"
. "k8s.io/kubernetes/pkg/controller/testutil"
"k8s.io/kubernetes/pkg/securitycontext"
@ -60,10 +60,12 @@ var (
informerSyncTimeout = 30 * time.Second
)
func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh chan struct{}, burstReplicas int) (*ReplicaSetController, informers.SharedInformerFactory) {
func testNewReplicaSetControllerFromClient(tb testing.TB, client clientset.Interface, stopCh chan struct{}, burstReplicas int) (*ReplicaSetController, informers.SharedInformerFactory) {
informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
logger, _ := ktesting.NewTestContext(tb)
ret := NewReplicaSetController(
logger,
informers.Apps().V1().ReplicaSets(),
informers.Core().V1().Pods(),
client,
@ -217,7 +219,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) {
fakePodControl := controller.FakePodControl{}
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
manager, informers := testNewReplicaSetControllerFromClient(t, client, stopCh, BurstReplicas)
// 2 running pods, a controller with 2 replicas, sync is a no-op
labelMap := map[string]string{"foo": "bar"}
@ -238,7 +240,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
fakePodControl := controller.FakePodControl{}
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
manager, informers := testNewReplicaSetControllerFromClient(t, client, stopCh, BurstReplicas)
manager.podControl = &fakePodControl
received := make(chan string)
@ -279,7 +281,7 @@ func TestSyncReplicaSetCreateFailures(t *testing.T) {
client := fake.NewSimpleClientset(rs)
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
manager, informers := testNewReplicaSetControllerFromClient(t, client, stopCh, BurstReplicas)
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
@ -313,7 +315,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
fakePodControl := controller.FakePodControl{}
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
manager, informers := testNewReplicaSetControllerFromClient(t, client, stopCh, BurstReplicas)
manager.podControl = &fakePodControl
@ -391,10 +393,11 @@ func TestGetReplicaSetsWithSameController(t *testing.T) {
pendingDeletionRS.ObjectMeta.OwnerReferences[0].UID = "789"
now := metav1.Now()
pendingDeletionRS.DeletionTimestamp = &now
logger, _ := ktesting.NewTestContext(t)
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas)
manager, informers := testNewReplicaSetControllerFromClient(t, clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas)
testCases := []struct {
name string
rss []*apps.ReplicaSet
@ -424,7 +427,7 @@ func TestGetReplicaSetsWithSameController(t *testing.T) {
for _, r := range c.rss {
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(r)
}
actualRSs := manager.getReplicaSetsWithSameController(c.rs)
actualRSs := manager.getReplicaSetsWithSameController(logger, c.rs)
var actualRSNames, expectedRSNames []string
for _, r := range actualRSs {
actualRSNames = append(actualRSNames, r.Name)
@ -443,7 +446,8 @@ func TestGetReplicaSetsWithSameController(t *testing.T) {
func BenchmarkGetReplicaSetsWithSameController(b *testing.B) {
stopCh := make(chan struct{})
defer close(stopCh)
controller, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas)
controller, informers := testNewReplicaSetControllerFromClient(b, clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas)
logger, _ := ktesting.NewTestContext(b)
targetRS := newReplicaSet(1, map[string]string{"foo": "bar"})
targetRS.Name = "rs1"
@ -463,7 +467,7 @@ func BenchmarkGetReplicaSetsWithSameController(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
gotRSs := controller.getReplicaSetsWithSameController(targetRS)
gotRSs := controller.getReplicaSetsWithSameController(logger, targetRS)
if len(gotRSs) != 2 {
b.Errorf("Incorrect ReplicaSets number, expected 2, got: %d", len(gotRSs))
}
@ -473,7 +477,7 @@ func BenchmarkGetReplicaSetsWithSameController(b *testing.B) {
func TestPodControllerLookup(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas)
manager, informers := testNewReplicaSetControllerFromClient(t, clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas)
testCases := []struct {
inRSs []*apps.ReplicaSet
pod *v1.Pod
@ -553,10 +557,11 @@ func TestRelatedPodsLookup(t *testing.T) {
pod2 := newPod("pod2", someRS, v1.PodRunning, nil, true)
pod3 := newPod("pod3", relatedRS, v1.PodRunning, nil, true)
pod4 := newPod("pod4", unrelatedRS, v1.PodRunning, nil, true)
logger, _ := ktesting.NewTestContext(t)
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas)
manager, informers := testNewReplicaSetControllerFromClient(t, clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas)
testCases := []struct {
name string
rss []*apps.ReplicaSet
@ -594,7 +599,7 @@ func TestRelatedPodsLookup(t *testing.T) {
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod)
manager.addPod(pod)
}
actualPods, err := manager.getIndirectlyRelatedPods(c.rs)
actualPods, err := manager.getIndirectlyRelatedPods(logger, c.rs)
if err != nil {
t.Errorf("Unexpected error from getIndirectlyRelatedPods: %v", err)
}
@ -617,7 +622,9 @@ func TestWatchControllers(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
logger, _ := ktesting.NewTestContext(t)
manager := NewReplicaSetController(
logger,
informers.Apps().V1().ReplicaSets(),
informers.Core().V1().Pods(),
client,
@ -667,7 +674,7 @@ func TestWatchPods(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
manager, informers := testNewReplicaSetControllerFromClient(t, client, stopCh, BurstReplicas)
// Put one ReplicaSet into the shared informer
labelMap := map[string]string{"foo": "bar"}
@ -713,7 +720,7 @@ func TestWatchPods(t *testing.T) {
func TestUpdatePods(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(fake.NewSimpleClientset(), stopCh, BurstReplicas)
manager, informers := testNewReplicaSetControllerFromClient(t, fake.NewSimpleClientset(), stopCh, BurstReplicas)
received := make(chan string)
@ -849,7 +856,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
})
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
manager, informers := testNewReplicaSetControllerFromClient(t, client, stopCh, BurstReplicas)
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
rs.Status = apps.ReplicaSetStatus{Replicas: 2}
@ -878,7 +885,8 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
fakeRSClient := fakeClient.AppsV1().ReplicaSets("default")
numReplicas := int32(10)
newStatus := apps.ReplicaSetStatus{Replicas: numReplicas}
updateReplicaSetStatus(fakeRSClient, rs, newStatus)
logger, _ := ktesting.NewTestContext(t)
updateReplicaSetStatus(logger, fakeRSClient, rs, newStatus)
updates, gets := 0, 0
for _, a := range fakeClient.Actions() {
if a.GetResource().Resource != "replicasets" {
@ -921,7 +929,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
fakePodControl := controller.FakePodControl{}
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, burstReplicas)
manager, informers := testNewReplicaSetControllerFromClient(t, client, stopCh, burstReplicas)
manager.podControl = &fakePodControl
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec)
@ -1089,7 +1097,7 @@ func TestRSSyncExpectations(t *testing.T) {
fakePodControl := controller.FakePodControl{}
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 2)
manager, informers := testNewReplicaSetControllerFromClient(t, client, stopCh, 2)
manager.podControl = &fakePodControl
labelMap := map[string]string{"foo": "bar"}
@ -1119,7 +1127,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
client := fake.NewSimpleClientset(rs)
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10)
manager, informers := testNewReplicaSetControllerFromClient(t, client, stopCh, 10)
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
@ -1174,7 +1182,9 @@ func TestExpectationsOnRecreate(t *testing.T) {
defer close(stopCh)
f := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
logger, _ := ktesting.NewTestContext(t)
manager := NewReplicaSetController(
logger,
f.Apps().V1().ReplicaSets(),
f.Core().V1().Pods(),
client,
@ -1196,7 +1206,7 @@ func TestExpectationsOnRecreate(t *testing.T) {
}
err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) {
klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", manager.queue.Len())
logger.V(8).Info("Waiting for queue to have 1 item", "length", manager.queue.Len())
return manager.queue.Len() == 1, nil
})
if err != nil {
@ -1240,7 +1250,7 @@ func TestExpectationsOnRecreate(t *testing.T) {
}
err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) {
klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", manager.queue.Len())
logger.V(8).Info("Waiting for queue to have 1 item", "length", manager.queue.Len())
return manager.queue.Len() == 1, nil
})
if err != nil {
@ -1282,7 +1292,7 @@ func TestExpectationsOnRecreate(t *testing.T) {
}
err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) {
klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", manager.queue.Len())
logger.V(8).Info("Waiting for queue to have 1 item", "length", manager.queue.Len())
return manager.queue.Len() == 1, nil
})
if err != nil {
@ -1333,7 +1343,7 @@ func TestOverlappingRSs(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10)
manager, informers := testNewReplicaSetControllerFromClient(t, client, stopCh, 10)
// Create 10 ReplicaSets, shuffled them randomly and insert them into the
// ReplicaSet controller's store.
@ -1375,7 +1385,7 @@ func TestDeletionTimestamp(t *testing.T) {
labelMap := map[string]string{"foo": "bar"}
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(c, stopCh, 10)
manager, informers := testNewReplicaSetControllerFromClient(t, c, stopCh, 10)
rs := newReplicaSet(1, labelMap)
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
@ -1467,10 +1477,10 @@ func TestDeletionTimestamp(t *testing.T) {
}
// setupManagerWithGCEnabled creates a RS manager with a fakePodControl
func setupManagerWithGCEnabled(stopCh chan struct{}, objs ...runtime.Object) (manager *ReplicaSetController, fakePodControl *controller.FakePodControl, informers informers.SharedInformerFactory) {
func setupManagerWithGCEnabled(t *testing.T, stopCh chan struct{}, objs ...runtime.Object) (manager *ReplicaSetController, fakePodControl *controller.FakePodControl, informers informers.SharedInformerFactory) {
c := fake.NewSimpleClientset(objs...)
fakePodControl = &controller.FakePodControl{}
manager, informers = testNewReplicaSetControllerFromClient(c, stopCh, BurstReplicas)
manager, informers = testNewReplicaSetControllerFromClient(t, c, stopCh, BurstReplicas)
manager.podControl = fakePodControl
return manager, fakePodControl, informers
@ -1481,7 +1491,7 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
rs := newReplicaSet(2, labelMap)
stopCh := make(chan struct{})
defer close(stopCh)
manager, fakePodControl, informers := setupManagerWithGCEnabled(stopCh, rs)
manager, fakePodControl, informers := setupManagerWithGCEnabled(t, stopCh, rs)
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
var trueVar = true
otherControllerReference := metav1.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar}
@ -1505,7 +1515,7 @@ func TestPatchPodFails(t *testing.T) {
rs := newReplicaSet(2, labelMap)
stopCh := make(chan struct{})
defer close(stopCh)
manager, fakePodControl, informers := setupManagerWithGCEnabled(stopCh, rs)
manager, fakePodControl, informers := setupManagerWithGCEnabled(t, stopCh, rs)
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
// add to podLister two matching pods. Expect two patches to take control
// them.
@ -1540,7 +1550,7 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
rs.DeletionTimestamp = &now
stopCh := make(chan struct{})
defer close(stopCh)
manager, fakePodControl, informers := setupManagerWithGCEnabled(stopCh, rs)
manager, fakePodControl, informers := setupManagerWithGCEnabled(t, stopCh, rs)
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
pod1 := newPod("pod1", rs, v1.PodRunning, nil, false)
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
@ -1564,7 +1574,7 @@ func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) {
rs.DeletionTimestamp = &now
stopCh := make(chan struct{})
defer close(stopCh)
manager, fakePodControl, informers := setupManagerWithGCEnabled(stopCh, rs)
manager, fakePodControl, informers := setupManagerWithGCEnabled(t, stopCh, rs)
// Lister (cache) says it's NOT deleted.
rs2 := *rs
rs2.DeletionTimestamp = nil

View File

@ -26,7 +26,7 @@ import (
"k8s.io/klog/v2"
apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
appsclient "k8s.io/client-go/kubernetes/typed/apps/v1"
@ -34,7 +34,7 @@ import (
)
// updateReplicaSetStatus attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry.
func updateReplicaSetStatus(c appsclient.ReplicaSetInterface, rs *apps.ReplicaSet, newStatus apps.ReplicaSetStatus) (*apps.ReplicaSet, error) {
func updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface, rs *apps.ReplicaSet, newStatus apps.ReplicaSetStatus) (*apps.ReplicaSet, error) {
// This is the steady state. It happens when the ReplicaSet doesn't have any expectations, since
// we do a periodic relist every 30s. If the generations differ but the replicas are
// the same, a caller might've resized to the same replica count.
@ -56,7 +56,7 @@ func updateReplicaSetStatus(c appsclient.ReplicaSetInterface, rs *apps.ReplicaSe
var getErr, updateErr error
var updatedRS *apps.ReplicaSet
for i, rs := 0, rs; ; i++ {
klog.V(4).Infof(fmt.Sprintf("Updating status for %v: %s/%s, ", rs.Kind, rs.Namespace, rs.Name) +
logger.V(4).Info(fmt.Sprintf("Updating status for %v: %s/%s, ", rs.Kind, rs.Namespace, rs.Name) +
fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, newStatus.Replicas, *(rs.Spec.Replicas)) +
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, newStatus.ReadyReplicas) +

View File

@ -26,7 +26,7 @@ limitations under the License.
package replication
import (
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"

View File

@ -24,13 +24,14 @@ import (
"time"
apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/deployment"
@ -105,6 +106,7 @@ func newDeployment(name, ns string, replicas int32) *apps.Deployment {
func dcSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *replicaset.ReplicaSetController, *deployment.DeploymentController, informers.SharedInformerFactory, clientset.Interface) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
logger, _ := ktesting.NewTestContext(t)
config := restclient.CopyConfig(server.ClientConfig)
clientSet, err := clientset.NewForConfig(config)
@ -124,6 +126,7 @@ func dcSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *replicaset.Repli
t.Fatalf("error creating Deployment controller: %v", err)
}
rm := replicaset.NewReplicaSetController(
logger,
informers.Apps().V1().ReplicaSets(),
informers.Core().V1().Pods(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "replicaset-controller")),

View File

@ -25,7 +25,7 @@ import (
"time"
apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -40,6 +40,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core"
@ -128,8 +129,10 @@ func rmSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *replicaset.Repli
}
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "rs-informers")), resyncPeriod)
logger, _ := ktesting.NewTestContext(t)
rm := replicaset.NewReplicaSetController(
logger,
informers.Apps().V1().ReplicaSets(),
informers.Core().V1().Pods(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "replicaset-controller")),