mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 18:02:01 +00:00
Merge pull request #113986 from songxiao-wang87/runwxs-test2
Migrate StorageVersionGC to contextual logging
This commit is contained in:
commit
4aaa4df840
@ -689,7 +689,9 @@ func setNodeCIDRMaskSizes(cfg nodeipamconfig.NodeIPAMControllerConfiguration, cl
|
||||
}
|
||||
|
||||
func startStorageVersionGCController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "storageVersionGC"))
|
||||
go storageversiongc.NewStorageVersionGC(
|
||||
ctx,
|
||||
controllerContext.ClientBuilder.ClientOrDie("storage-version-garbage-collector"),
|
||||
controllerContext.InformerFactory.Coordination().V1().Leases(),
|
||||
controllerContext.InformerFactory.Internal().V1alpha1().StorageVersions(),
|
||||
|
@ -55,7 +55,7 @@ type Controller struct {
|
||||
}
|
||||
|
||||
// NewStorageVersionGC creates a new Controller.
|
||||
func NewStorageVersionGC(clientset kubernetes.Interface, leaseInformer coordinformers.LeaseInformer, storageVersionInformer apiserverinternalinformers.StorageVersionInformer) *Controller {
|
||||
func NewStorageVersionGC(ctx context.Context, clientset kubernetes.Interface, leaseInformer coordinformers.LeaseInformer, storageVersionInformer apiserverinternalinformers.StorageVersionInformer) *Controller {
|
||||
c := &Controller{
|
||||
kubeclientset: clientset,
|
||||
leaseLister: leaseInformer.Lister(),
|
||||
@ -64,14 +64,20 @@ func NewStorageVersionGC(clientset kubernetes.Interface, leaseInformer coordinfo
|
||||
leaseQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_leases"),
|
||||
storageVersionQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_storageversions"),
|
||||
}
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
DeleteFunc: c.onDeleteLease,
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
c.onDeleteLease(logger, obj)
|
||||
},
|
||||
})
|
||||
// use the default resync period from the informer
|
||||
storageVersionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.onAddStorageVersion,
|
||||
UpdateFunc: c.onUpdateStorageVersion,
|
||||
AddFunc: func(obj interface{}) {
|
||||
c.onAddStorageVersion(logger, obj)
|
||||
},
|
||||
UpdateFunc: func(old, newObj interface{}) {
|
||||
c.onUpdateStorageVersion(logger, old, newObj)
|
||||
},
|
||||
})
|
||||
|
||||
return c
|
||||
@ -79,12 +85,13 @@ func NewStorageVersionGC(clientset kubernetes.Interface, leaseInformer coordinfo
|
||||
|
||||
// Run starts one worker.
|
||||
func (c *Controller) Run(ctx context.Context) {
|
||||
logger := klog.FromContext(ctx)
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.leaseQueue.ShutDown()
|
||||
defer c.storageVersionQueue.ShutDown()
|
||||
defer klog.Infof("Shutting down storage version garbage collector")
|
||||
defer logger.Info("Shutting down storage version garbage collector")
|
||||
|
||||
klog.Infof("Starting storage version garbage collector")
|
||||
logger.Info("Starting storage version garbage collector")
|
||||
|
||||
if !cache.WaitForCacheSync(ctx.Done(), c.leasesSynced, c.storageVersionSynced) {
|
||||
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
|
||||
@ -215,31 +222,31 @@ func (c *Controller) syncStorageVersion(ctx context.Context, name string) error
|
||||
return c.updateOrDeleteStorageVersion(ctx, sv, serverStorageVersions)
|
||||
}
|
||||
|
||||
func (c *Controller) onAddStorageVersion(obj interface{}) {
|
||||
func (c *Controller) onAddStorageVersion(logger klog.Logger, obj interface{}) {
|
||||
castObj := obj.(*apiserverinternalv1alpha1.StorageVersion)
|
||||
c.enqueueStorageVersion(castObj)
|
||||
c.enqueueStorageVersion(logger, castObj)
|
||||
}
|
||||
|
||||
func (c *Controller) onUpdateStorageVersion(oldObj, newObj interface{}) {
|
||||
func (c *Controller) onUpdateStorageVersion(logger klog.Logger, oldObj, newObj interface{}) {
|
||||
castNewObj := newObj.(*apiserverinternalv1alpha1.StorageVersion)
|
||||
c.enqueueStorageVersion(castNewObj)
|
||||
c.enqueueStorageVersion(logger, castNewObj)
|
||||
}
|
||||
|
||||
// enqueueStorageVersion enqueues the storage version if it has entry for invalid apiserver
|
||||
func (c *Controller) enqueueStorageVersion(obj *apiserverinternalv1alpha1.StorageVersion) {
|
||||
func (c *Controller) enqueueStorageVersion(logger klog.Logger, obj *apiserverinternalv1alpha1.StorageVersion) {
|
||||
for _, sv := range obj.Status.StorageVersions {
|
||||
lease, err := c.leaseLister.Leases(metav1.NamespaceSystem).Get(sv.APIServerID)
|
||||
if err != nil || lease == nil || lease.Labels == nil ||
|
||||
lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer {
|
||||
// we cannot find a corresponding identity lease in cache, enqueue the storageversion
|
||||
klog.V(4).Infof("Observed storage version %s with invalid apiserver entry", obj.Name)
|
||||
logger.V(4).Info("Observed storage version with invalid apiserver entry", "objName", obj.Name)
|
||||
c.storageVersionQueue.Add(obj.Name)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) onDeleteLease(obj interface{}) {
|
||||
func (c *Controller) onDeleteLease(logger klog.Logger, obj interface{}) {
|
||||
castObj, ok := obj.(*coordinationv1.Lease)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
@ -257,7 +264,7 @@ func (c *Controller) onDeleteLease(obj interface{}) {
|
||||
if castObj.Namespace == metav1.NamespaceSystem &&
|
||||
castObj.Labels != nil &&
|
||||
castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == controlplane.KubeAPIServer {
|
||||
klog.V(4).Infof("Observed lease %s deleted", castObj.Name)
|
||||
logger.V(4).Info("Observed lease deleted", "castObjName", castObj.Name)
|
||||
c.enqueueLease(castObj)
|
||||
}
|
||||
}
|
||||
|
@ -29,15 +29,16 @@ import (
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
utilpointer "k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
func setupController(clientset kubernetes.Interface) {
|
||||
func setupController(ctx context.Context, clientset kubernetes.Interface) {
|
||||
informerFactory := informers.NewSharedInformerFactory(clientset, 100*time.Millisecond)
|
||||
leaseInformer := informerFactory.Coordination().V1().Leases()
|
||||
storageVersionInformer := informerFactory.Internal().V1alpha1().StorageVersions()
|
||||
|
||||
controller := NewStorageVersionGC(clientset, leaseInformer, storageVersionInformer)
|
||||
controller := NewStorageVersionGC(ctx, clientset, leaseInformer, storageVersionInformer)
|
||||
go controller.Run(context.Background())
|
||||
informerFactory.Start(nil)
|
||||
}
|
||||
@ -93,7 +94,8 @@ func Test_StorageVersionUpdatedWithAllEncodingVersionsEqualOnLeaseDeletion(t *te
|
||||
}
|
||||
|
||||
clientset := fake.NewSimpleClientset(lease1, lease2, lease3, storageVersion)
|
||||
setupController(clientset)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
setupController(ctx, clientset)
|
||||
|
||||
// Delete the lease object and verify that storage version status is updated
|
||||
if err := clientset.CoordinationV1().Leases(metav1.NamespaceSystem).Delete(context.Background(), "kube-apiserver-1", metav1.DeleteOptions{}); err != nil {
|
||||
@ -177,7 +179,8 @@ func Test_StorageVersionUpdatedWithDifferentEncodingVersionsOnLeaseDeletion(t *t
|
||||
}
|
||||
|
||||
clientset := fake.NewSimpleClientset(lease1, lease2, lease3, storageVersion)
|
||||
setupController(clientset)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
setupController(ctx, clientset)
|
||||
|
||||
// Delete the lease object and verify that storage version status is updated
|
||||
if err := clientset.CoordinationV1().Leases(metav1.NamespaceSystem).Delete(context.Background(), "kube-apiserver-2", metav1.DeleteOptions{}); err != nil {
|
||||
@ -257,7 +260,8 @@ func Test_StorageVersionContainsInvalidLeaseID(t *testing.T) {
|
||||
}
|
||||
|
||||
clientset := fake.NewSimpleClientset(lease1, lease2, lease3, storageVersion)
|
||||
setupController(clientset)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
setupController(ctx, clientset)
|
||||
|
||||
// add a delay to ensure controller had a chance to reconcile
|
||||
time.Sleep(2 * time.Second)
|
||||
@ -325,7 +329,8 @@ func Test_StorageVersionDeletedOnLeaseDeletion(t *testing.T) {
|
||||
}
|
||||
|
||||
clientset := fake.NewSimpleClientset(lease1, storageVersion)
|
||||
setupController(clientset)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
setupController(ctx, clientset)
|
||||
|
||||
// Delete the lease object and verify that storage version status is updated
|
||||
if err := clientset.CoordinationV1().Leases(metav1.NamespaceSystem).Delete(context.Background(), "kube-apiserver-1", metav1.DeleteOptions{}); err != nil {
|
||||
|
@ -33,6 +33,7 @@ import (
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
"k8s.io/kubernetes/pkg/controller/storageversiongc"
|
||||
"k8s.io/kubernetes/pkg/controlplane"
|
||||
@ -62,7 +63,8 @@ func TestStorageVersionGarbageCollection(t *testing.T) {
|
||||
leaseInformer := informers.Coordination().V1().Leases()
|
||||
storageVersionInformer := informers.Internal().V1alpha1().StorageVersions()
|
||||
|
||||
controller := storageversiongc.NewStorageVersionGC(kubeclient, leaseInformer, storageVersionInformer)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
controller := storageversiongc.NewStorageVersionGC(ctx, kubeclient, leaseInformer, storageVersionInformer)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
Loading…
Reference in New Issue
Block a user