mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 10:20:51 +00:00
StorageVersionGC logger
Signed-off-by: songxiao-wang87 <wang.xiaosong23@zte.com.cn>
This commit is contained in:
parent
3f823c0daa
commit
9ae5af4b6a
@ -681,7 +681,9 @@ func setNodeCIDRMaskSizes(cfg nodeipamconfig.NodeIPAMControllerConfiguration, cl
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startStorageVersionGCController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startStorageVersionGCController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
|
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "storageVersionGC"))
|
||||||
go storageversiongc.NewStorageVersionGC(
|
go storageversiongc.NewStorageVersionGC(
|
||||||
|
ctx,
|
||||||
controllerContext.ClientBuilder.ClientOrDie("storage-version-garbage-collector"),
|
controllerContext.ClientBuilder.ClientOrDie("storage-version-garbage-collector"),
|
||||||
controllerContext.InformerFactory.Coordination().V1().Leases(),
|
controllerContext.InformerFactory.Coordination().V1().Leases(),
|
||||||
controllerContext.InformerFactory.Internal().V1alpha1().StorageVersions(),
|
controllerContext.InformerFactory.Internal().V1alpha1().StorageVersions(),
|
||||||
|
@ -55,7 +55,7 @@ type Controller struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewStorageVersionGC creates a new Controller.
|
// NewStorageVersionGC creates a new Controller.
|
||||||
func NewStorageVersionGC(clientset kubernetes.Interface, leaseInformer coordinformers.LeaseInformer, storageVersionInformer apiserverinternalinformers.StorageVersionInformer) *Controller {
|
func NewStorageVersionGC(ctx context.Context, clientset kubernetes.Interface, leaseInformer coordinformers.LeaseInformer, storageVersionInformer apiserverinternalinformers.StorageVersionInformer) *Controller {
|
||||||
c := &Controller{
|
c := &Controller{
|
||||||
kubeclientset: clientset,
|
kubeclientset: clientset,
|
||||||
leaseLister: leaseInformer.Lister(),
|
leaseLister: leaseInformer.Lister(),
|
||||||
@ -64,14 +64,20 @@ func NewStorageVersionGC(clientset kubernetes.Interface, leaseInformer coordinfo
|
|||||||
leaseQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_leases"),
|
leaseQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_leases"),
|
||||||
storageVersionQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_storageversions"),
|
storageVersionQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_storageversions"),
|
||||||
}
|
}
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
DeleteFunc: c.onDeleteLease,
|
DeleteFunc: func(obj interface{}) {
|
||||||
|
c.onDeleteLease(logger, obj)
|
||||||
|
},
|
||||||
})
|
})
|
||||||
// use the default resync period from the informer
|
// use the default resync period from the informer
|
||||||
storageVersionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
storageVersionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: c.onAddStorageVersion,
|
AddFunc: func(obj interface{}) {
|
||||||
UpdateFunc: c.onUpdateStorageVersion,
|
c.onAddStorageVersion(logger, obj)
|
||||||
|
},
|
||||||
|
UpdateFunc: func(old, newObj interface{}) {
|
||||||
|
c.onUpdateStorageVersion(logger, old, newObj)
|
||||||
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
return c
|
return c
|
||||||
@ -79,12 +85,13 @@ func NewStorageVersionGC(clientset kubernetes.Interface, leaseInformer coordinfo
|
|||||||
|
|
||||||
// Run starts one worker.
|
// Run starts one worker.
|
||||||
func (c *Controller) Run(ctx context.Context) {
|
func (c *Controller) Run(ctx context.Context) {
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
defer c.leaseQueue.ShutDown()
|
defer c.leaseQueue.ShutDown()
|
||||||
defer c.storageVersionQueue.ShutDown()
|
defer c.storageVersionQueue.ShutDown()
|
||||||
defer klog.Infof("Shutting down storage version garbage collector")
|
defer logger.Info("Shutting down storage version garbage collector")
|
||||||
|
|
||||||
klog.Infof("Starting storage version garbage collector")
|
logger.Info("Starting storage version garbage collector")
|
||||||
|
|
||||||
if !cache.WaitForCacheSync(ctx.Done(), c.leasesSynced, c.storageVersionSynced) {
|
if !cache.WaitForCacheSync(ctx.Done(), c.leasesSynced, c.storageVersionSynced) {
|
||||||
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
|
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
|
||||||
@ -215,31 +222,31 @@ func (c *Controller) syncStorageVersion(ctx context.Context, name string) error
|
|||||||
return c.updateOrDeleteStorageVersion(ctx, sv, serverStorageVersions)
|
return c.updateOrDeleteStorageVersion(ctx, sv, serverStorageVersions)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) onAddStorageVersion(obj interface{}) {
|
func (c *Controller) onAddStorageVersion(logger klog.Logger, obj interface{}) {
|
||||||
castObj := obj.(*apiserverinternalv1alpha1.StorageVersion)
|
castObj := obj.(*apiserverinternalv1alpha1.StorageVersion)
|
||||||
c.enqueueStorageVersion(castObj)
|
c.enqueueStorageVersion(logger, castObj)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) onUpdateStorageVersion(oldObj, newObj interface{}) {
|
func (c *Controller) onUpdateStorageVersion(logger klog.Logger, oldObj, newObj interface{}) {
|
||||||
castNewObj := newObj.(*apiserverinternalv1alpha1.StorageVersion)
|
castNewObj := newObj.(*apiserverinternalv1alpha1.StorageVersion)
|
||||||
c.enqueueStorageVersion(castNewObj)
|
c.enqueueStorageVersion(logger, castNewObj)
|
||||||
}
|
}
|
||||||
|
|
||||||
// enqueueStorageVersion enqueues the storage version if it has entry for invalid apiserver
|
// enqueueStorageVersion enqueues the storage version if it has entry for invalid apiserver
|
||||||
func (c *Controller) enqueueStorageVersion(obj *apiserverinternalv1alpha1.StorageVersion) {
|
func (c *Controller) enqueueStorageVersion(logger klog.Logger, obj *apiserverinternalv1alpha1.StorageVersion) {
|
||||||
for _, sv := range obj.Status.StorageVersions {
|
for _, sv := range obj.Status.StorageVersions {
|
||||||
lease, err := c.leaseLister.Leases(metav1.NamespaceSystem).Get(sv.APIServerID)
|
lease, err := c.leaseLister.Leases(metav1.NamespaceSystem).Get(sv.APIServerID)
|
||||||
if err != nil || lease == nil || lease.Labels == nil ||
|
if err != nil || lease == nil || lease.Labels == nil ||
|
||||||
lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer {
|
lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer {
|
||||||
// we cannot find a corresponding identity lease in cache, enqueue the storageversion
|
// we cannot find a corresponding identity lease in cache, enqueue the storageversion
|
||||||
klog.V(4).Infof("Observed storage version %s with invalid apiserver entry", obj.Name)
|
logger.V(4).Info("Observed storage version with invalid apiserver entry", "objName", obj.Name)
|
||||||
c.storageVersionQueue.Add(obj.Name)
|
c.storageVersionQueue.Add(obj.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) onDeleteLease(obj interface{}) {
|
func (c *Controller) onDeleteLease(logger klog.Logger, obj interface{}) {
|
||||||
castObj, ok := obj.(*coordinationv1.Lease)
|
castObj, ok := obj.(*coordinationv1.Lease)
|
||||||
if !ok {
|
if !ok {
|
||||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||||
@ -257,7 +264,7 @@ func (c *Controller) onDeleteLease(obj interface{}) {
|
|||||||
if castObj.Namespace == metav1.NamespaceSystem &&
|
if castObj.Namespace == metav1.NamespaceSystem &&
|
||||||
castObj.Labels != nil &&
|
castObj.Labels != nil &&
|
||||||
castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == controlplane.KubeAPIServer {
|
castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == controlplane.KubeAPIServer {
|
||||||
klog.V(4).Infof("Observed lease %s deleted", castObj.Name)
|
logger.V(4).Info("Observed lease deleted", "castObjName", castObj.Name)
|
||||||
c.enqueueLease(castObj)
|
c.enqueueLease(castObj)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -29,15 +29,16 @@ import (
|
|||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
|
"k8s.io/klog/v2/ktesting"
|
||||||
utilpointer "k8s.io/utils/pointer"
|
utilpointer "k8s.io/utils/pointer"
|
||||||
)
|
)
|
||||||
|
|
||||||
func setupController(clientset kubernetes.Interface) {
|
func setupController(ctx context.Context, clientset kubernetes.Interface) {
|
||||||
informerFactory := informers.NewSharedInformerFactory(clientset, 100*time.Millisecond)
|
informerFactory := informers.NewSharedInformerFactory(clientset, 100*time.Millisecond)
|
||||||
leaseInformer := informerFactory.Coordination().V1().Leases()
|
leaseInformer := informerFactory.Coordination().V1().Leases()
|
||||||
storageVersionInformer := informerFactory.Internal().V1alpha1().StorageVersions()
|
storageVersionInformer := informerFactory.Internal().V1alpha1().StorageVersions()
|
||||||
|
|
||||||
controller := NewStorageVersionGC(clientset, leaseInformer, storageVersionInformer)
|
controller := NewStorageVersionGC(ctx, clientset, leaseInformer, storageVersionInformer)
|
||||||
go controller.Run(context.Background())
|
go controller.Run(context.Background())
|
||||||
informerFactory.Start(nil)
|
informerFactory.Start(nil)
|
||||||
}
|
}
|
||||||
@ -93,7 +94,8 @@ func Test_StorageVersionUpdatedWithAllEncodingVersionsEqualOnLeaseDeletion(t *te
|
|||||||
}
|
}
|
||||||
|
|
||||||
clientset := fake.NewSimpleClientset(lease1, lease2, lease3, storageVersion)
|
clientset := fake.NewSimpleClientset(lease1, lease2, lease3, storageVersion)
|
||||||
setupController(clientset)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
|
setupController(ctx, clientset)
|
||||||
|
|
||||||
// Delete the lease object and verify that storage version status is updated
|
// Delete the lease object and verify that storage version status is updated
|
||||||
if err := clientset.CoordinationV1().Leases(metav1.NamespaceSystem).Delete(context.Background(), "kube-apiserver-1", metav1.DeleteOptions{}); err != nil {
|
if err := clientset.CoordinationV1().Leases(metav1.NamespaceSystem).Delete(context.Background(), "kube-apiserver-1", metav1.DeleteOptions{}); err != nil {
|
||||||
@ -177,7 +179,8 @@ func Test_StorageVersionUpdatedWithDifferentEncodingVersionsOnLeaseDeletion(t *t
|
|||||||
}
|
}
|
||||||
|
|
||||||
clientset := fake.NewSimpleClientset(lease1, lease2, lease3, storageVersion)
|
clientset := fake.NewSimpleClientset(lease1, lease2, lease3, storageVersion)
|
||||||
setupController(clientset)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
|
setupController(ctx, clientset)
|
||||||
|
|
||||||
// Delete the lease object and verify that storage version status is updated
|
// Delete the lease object and verify that storage version status is updated
|
||||||
if err := clientset.CoordinationV1().Leases(metav1.NamespaceSystem).Delete(context.Background(), "kube-apiserver-2", metav1.DeleteOptions{}); err != nil {
|
if err := clientset.CoordinationV1().Leases(metav1.NamespaceSystem).Delete(context.Background(), "kube-apiserver-2", metav1.DeleteOptions{}); err != nil {
|
||||||
@ -257,7 +260,8 @@ func Test_StorageVersionContainsInvalidLeaseID(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
clientset := fake.NewSimpleClientset(lease1, lease2, lease3, storageVersion)
|
clientset := fake.NewSimpleClientset(lease1, lease2, lease3, storageVersion)
|
||||||
setupController(clientset)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
|
setupController(ctx, clientset)
|
||||||
|
|
||||||
// add a delay to ensure controller had a chance to reconcile
|
// add a delay to ensure controller had a chance to reconcile
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
@ -325,7 +329,8 @@ func Test_StorageVersionDeletedOnLeaseDeletion(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
clientset := fake.NewSimpleClientset(lease1, storageVersion)
|
clientset := fake.NewSimpleClientset(lease1, storageVersion)
|
||||||
setupController(clientset)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
|
setupController(ctx, clientset)
|
||||||
|
|
||||||
// Delete the lease object and verify that storage version status is updated
|
// Delete the lease object and verify that storage version status is updated
|
||||||
if err := clientset.CoordinationV1().Leases(metav1.NamespaceSystem).Delete(context.Background(), "kube-apiserver-1", metav1.DeleteOptions{}); err != nil {
|
if err := clientset.CoordinationV1().Leases(metav1.NamespaceSystem).Delete(context.Background(), "kube-apiserver-1", metav1.DeleteOptions{}); err != nil {
|
||||||
|
@ -33,6 +33,7 @@ import (
|
|||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||||
|
"k8s.io/klog/v2/ktesting"
|
||||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||||
"k8s.io/kubernetes/pkg/controller/storageversiongc"
|
"k8s.io/kubernetes/pkg/controller/storageversiongc"
|
||||||
"k8s.io/kubernetes/pkg/controlplane"
|
"k8s.io/kubernetes/pkg/controlplane"
|
||||||
@ -62,7 +63,8 @@ func TestStorageVersionGarbageCollection(t *testing.T) {
|
|||||||
leaseInformer := informers.Coordination().V1().Leases()
|
leaseInformer := informers.Coordination().V1().Leases()
|
||||||
storageVersionInformer := informers.Internal().V1alpha1().StorageVersions()
|
storageVersionInformer := informers.Internal().V1alpha1().StorageVersions()
|
||||||
|
|
||||||
controller := storageversiongc.NewStorageVersionGC(kubeclient, leaseInformer, storageVersionInformer)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
|
controller := storageversiongc.NewStorageVersionGC(ctx, kubeclient, leaseInformer, storageVersionInformer)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
Loading…
Reference in New Issue
Block a user