kube-controller-manager: readjust log verbosity

- Increase the global level for broadcaster's logging to 3 so that users can ignore event messages by lowering the logging level. It reduces information noise.
- Making sure the context is properly injected into the broadcaster, this will allow the -v flag value to be used also in that broadcaster, rather than the above global value.
- test: use cancellation from ktesting
- golangci-hints: checked error return value
This commit is contained in:
Mengjiao Liu
2023-12-13 16:11:08 +08:00
parent 1b07df8845
commit b584b87a94
66 changed files with 770 additions and 649 deletions

View File

@@ -106,7 +106,7 @@ type AttachDetachController interface {
// NewAttachDetachController returns a new instance of AttachDetachController.
func NewAttachDetachController(
logger klog.Logger,
ctx context.Context,
kubeClient clientset.Interface,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
@@ -123,6 +123,8 @@ func NewAttachDetachController(
disableForceDetachOnTimeout bool,
timerConfig TimerConfig) (AttachDetachController, error) {
logger := klog.FromContext(ctx)
adc := &attachDetachController{
kubeClient: kubeClient,
pvcLister: pvcInformer.Lister(),
@@ -151,7 +153,7 @@ func NewAttachDetachController(
return nil, fmt.Errorf("could not initialize volume plugins for Attach/Detach Controller: %w", err)
}
adc.broadcaster = record.NewBroadcaster()
adc.broadcaster = record.NewBroadcaster(record.WithContext(ctx))
recorder := adc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "attachdetach-controller"})
blkutil := volumepathhandler.NewBlockVolumePathHandler()
@@ -332,7 +334,7 @@ func (adc *attachDetachController) Run(ctx context.Context) {
defer adc.pvcQueue.ShutDown()
// Start events processing pipeline.
adc.broadcaster.StartStructuredLogging(0)
adc.broadcaster.StartStructuredLogging(3)
adc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: adc.kubeClient.CoreV1().Events("")})
defer adc.broadcaster.Shutdown()

View File

@@ -30,13 +30,13 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/test/utils/ktesting"
)
const (
@@ -50,9 +50,9 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
// Act
logger, _ := ktesting.NewTestContext(t)
tCtx := ktesting.Init(t)
_, err := NewAttachDetachController(
logger,
tCtx,
fakeKubeClient,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
@@ -81,11 +81,9 @@ func Test_AttachDetachControllerStateOfWorldPopulators_Positive(t *testing.T) {
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
logger, tCtx := ktesting.NewTestContext(t)
adcObj, err := NewAttachDetachController(
logger,
tCtx,
fakeKubeClient,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
@@ -109,8 +107,8 @@ func Test_AttachDetachControllerStateOfWorldPopulators_Positive(t *testing.T) {
adc := adcObj.(*attachDetachController)
// Act
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
informerFactory.Start(tCtx.Done())
informerFactory.WaitForCacheSync(tCtx.Done())
err = adc.populateActualStateOfWorld(logger)
if err != nil {
@@ -206,11 +204,9 @@ func BenchmarkPopulateActualStateOfWorld(b *testing.B) {
}
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
logger, ctx := ktesting.NewTestContext(b)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
logger, tCtx := ktesting.NewTestContext(b)
adcObj, err := NewAttachDetachController(
logger,
tCtx,
fakeKubeClient,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
@@ -234,8 +230,8 @@ func BenchmarkPopulateActualStateOfWorld(b *testing.B) {
adc := adcObj.(*attachDetachController)
// Act
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
informerFactory.Start(tCtx.Done())
informerFactory.WaitForCacheSync(tCtx.Done())
b.ResetTimer()
err = adc.populateActualStateOfWorld(logger)
@@ -267,11 +263,9 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
var podsNum, extraPodsNum, nodesNum, i int
// Create the controller
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
logger, tCtx := ktesting.NewTestContext(t)
adcObj, err := NewAttachDetachController(
logger,
tCtx,
fakeKubeClient,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
@@ -295,7 +289,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
adc := adcObj.(*attachDetachController)
pods, err := fakeKubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
pods, err := fakeKubeClient.CoreV1().Pods(v1.NamespaceAll).List(tCtx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
}
@@ -305,7 +299,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
podInformer.GetIndexer().Add(&podToAdd)
podsNum++
}
nodes, err := fakeKubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
nodes, err := fakeKubeClient.CoreV1().Nodes().List(tCtx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
}
@@ -315,7 +309,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
nodesNum++
}
csiNodes, err := fakeKubeClient.StorageV1().CSINodes().List(context.TODO(), metav1.ListOptions{})
csiNodes, err := fakeKubeClient.StorageV1().CSINodes().List(tCtx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
}
@@ -324,9 +318,9 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
csiNodeInformer.GetIndexer().Add(&csiNodeToAdd)
}
informerFactory.Start(ctx.Done())
informerFactory.Start(tCtx.Done())
if !kcache.WaitForNamedCacheSync("attach detach", ctx.Done(),
if !kcache.WaitForNamedCacheSync("attach detach", tCtx.Done(),
informerFactory.Core().V1().Pods().Informer().HasSynced,
informerFactory.Core().V1().Nodes().Informer().HasSynced,
informerFactory.Storage().V1().CSINodes().Informer().HasSynced) {
@@ -382,7 +376,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
for _, newPod := range extraPods1 {
// Add a new pod between ASW and DSW ppoulators
_, err = adc.kubeClient.CoreV1().Pods(newPod.ObjectMeta.Namespace).Create(context.TODO(), newPod, metav1.CreateOptions{})
_, err = adc.kubeClient.CoreV1().Pods(newPod.ObjectMeta.Namespace).Create(tCtx, newPod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err)
}
@@ -399,7 +393,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
for _, newPod := range extraPods2 {
// Add a new pod between DSW ppoulator and reconciler run
_, err = adc.kubeClient.CoreV1().Pods(newPod.ObjectMeta.Namespace).Create(context.TODO(), newPod, metav1.CreateOptions{})
_, err = adc.kubeClient.CoreV1().Pods(newPod.ObjectMeta.Namespace).Create(tCtx, newPod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err)
}
@@ -407,8 +401,8 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
podInformer.GetIndexer().Add(newPod)
}
go adc.reconciler.Run(ctx)
go adc.desiredStateOfWorldPopulator.Run(ctx)
go adc.reconciler.Run(tCtx)
go adc.desiredStateOfWorldPopulator.Run(tCtx)
time.Sleep(time.Second * 1) // Wait so the reconciler calls sync at least once
@@ -533,11 +527,9 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
vaInformer := informerFactory.Storage().V1().VolumeAttachments().Informer()
// Create the controller
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
logger, tCtx := ktesting.NewTestContext(t)
adcObj, err := NewAttachDetachController(
logger,
tCtx,
fakeKubeClient,
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
@@ -560,7 +552,7 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
adc := adcObj.(*attachDetachController)
// Add existing objects (created by testplugin) to the respective informers
pods, err := fakeKubeClient.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
pods, err := fakeKubeClient.CoreV1().Pods(v1.NamespaceAll).List(tCtx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
}
@@ -568,7 +560,7 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
podToAdd := pod
podInformer.GetIndexer().Add(&podToAdd)
}
nodes, err := fakeKubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
nodes, err := fakeKubeClient.CoreV1().Nodes().List(tCtx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
}
@@ -597,7 +589,7 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
},
},
}
_, err = adc.kubeClient.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{})
_, err = adc.kubeClient.CoreV1().Nodes().Update(tCtx, newNode, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err)
}
@@ -606,7 +598,7 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
// Create and add objects requested by the test
if tc.podName != "" {
newPod := controllervolumetesting.NewPodWithVolume(tc.podName, tc.volName, tc.podNodeName)
_, err = adc.kubeClient.CoreV1().Pods(newPod.ObjectMeta.Namespace).Create(context.TODO(), newPod, metav1.CreateOptions{})
_, err = adc.kubeClient.CoreV1().Pods(newPod.ObjectMeta.Namespace).Create(tCtx, newPod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err)
}
@@ -621,7 +613,7 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
// Otherwise use NFS, which is not subject to migration.
newPv = controllervolumetesting.NewNFSPV(tc.pvName, tc.volName)
}
_, err = adc.kubeClient.CoreV1().PersistentVolumes().Create(context.TODO(), newPv, metav1.CreateOptions{})
_, err = adc.kubeClient.CoreV1().PersistentVolumes().Create(tCtx, newPv, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Run failed with error. Failed to create a new pv: <%v>", err)
}
@@ -629,7 +621,7 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
}
if tc.vaName != "" {
newVa := controllervolumetesting.NewVolumeAttachment(tc.vaName, tc.pvName, tc.vaNodeName, tc.vaAttachStatus)
_, err = adc.kubeClient.StorageV1().VolumeAttachments().Create(context.TODO(), newVa, metav1.CreateOptions{})
_, err = adc.kubeClient.StorageV1().VolumeAttachments().Create(tCtx, newVa, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Run failed with error. Failed to create a new volumeAttachment: <%v>", err)
}
@@ -637,9 +629,9 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
}
// Makesure the informer cache is synced
informerFactory.Start(ctx.Done())
informerFactory.Start(tCtx.Done())
if !kcache.WaitForNamedCacheSync("attach detach", ctx.Done(),
if !kcache.WaitForNamedCacheSync("attach detach", tCtx.Done(),
informerFactory.Core().V1().Pods().Informer().HasSynced,
informerFactory.Core().V1().Nodes().Informer().HasSynced,
informerFactory.Core().V1().PersistentVolumes().Informer().HasSynced,
@@ -659,8 +651,8 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
}
// Run reconciler and DSW populator loops
go adc.reconciler.Run(ctx)
go adc.desiredStateOfWorldPopulator.Run(ctx)
go adc.reconciler.Run(tCtx)
go adc.desiredStateOfWorldPopulator.Run(tCtx)
if tc.csiMigration {
verifyExpectedVolumeState(t, adc, tc)
} else {

View File

@@ -76,6 +76,7 @@ type ephemeralController struct {
// NewController creates an ephemeral volume controller.
func NewController(
ctx context.Context,
kubeClient clientset.Interface,
podInformer coreinformers.PodInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer) (Controller, error) {
@@ -92,7 +93,7 @@ func NewController(
ephemeralvolumemetrics.RegisterMetrics()
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ephemeral_volume"})

View File

@@ -22,6 +22,7 @@ import (
"sort"
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -35,8 +36,6 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/volume/ephemeral/metrics"
"github.com/stretchr/testify/assert"
)
var (
@@ -146,7 +145,7 @@ func TestSyncHandler(t *testing.T) {
podInformer := informerFactory.Core().V1().Pods()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
c, err := NewController(fakeKubeClient, podInformer, pvcInformer)
c, err := NewController(ctx, fakeKubeClient, podInformer, pvcInformer)
if err != nil {
t.Fatalf("error creating ephemeral controller : %v", err)
}

View File

@@ -100,6 +100,7 @@ type expandController struct {
// NewExpandController expands the pvs
func NewExpandController(
ctx context.Context,
kubeClient clientset.Interface,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
cloud cloudprovider.Interface,
@@ -121,8 +122,8 @@ func NewExpandController(
return nil, fmt.Errorf("could not initialize volume plugins for Expand Controller : %+v", err)
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
eventBroadcaster.StartStructuredLogging(3)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
expc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "volume_expand"})
blkutil := volumepathhandler.NewBlockVolumePathHandler()

View File

@@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestSyncHandler(t *testing.T) {
@@ -91,6 +92,7 @@ func TestSyncHandler(t *testing.T) {
}
for _, tc := range tests {
tCtx := ktesting.Init(t)
test := tc
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
@@ -106,7 +108,7 @@ func TestSyncHandler(t *testing.T) {
}
allPlugins := []volume.VolumePlugin{}
translator := csitrans.New()
expc, err := NewExpandController(fakeKubeClient, pvcInformer, nil, allPlugins, translator, csimigration.NewPluginManager(translator, utilfeature.DefaultFeatureGate))
expc, err := NewExpandController(tCtx, fakeKubeClient, pvcInformer, nil, allPlugins, translator, csimigration.NewPluginManager(translator, utilfeature.DefaultFeatureGate))
if err != nil {
t.Fatalf("error creating expand controller : %v", err)
}

View File

@@ -77,7 +77,7 @@ type ControllerParameters struct {
// NewController creates a new PersistentVolume controller
func NewController(ctx context.Context, p ControllerParameters) (*PersistentVolumeController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "persistentvolume-controller"})
controller := &PersistentVolumeController{
@@ -305,7 +305,7 @@ func (ctrl *PersistentVolumeController) Run(ctx context.Context) {
defer ctrl.volumeQueue.ShutDown()
// Start events processing pipeline.
ctrl.eventBroadcaster.StartStructuredLogging(0)
ctrl.eventBroadcaster.StartStructuredLogging(3)
ctrl.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ctrl.kubeClient.CoreV1().Events("")})
defer ctrl.eventBroadcaster.Shutdown()