From 33dca56ba8226b287bc6aeb4a6f6e90675c00320 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Mon, 2 May 2022 21:02:37 +0200 Subject: [PATCH] Destroy endpoints-reconciler lease storage on shutdown --- pkg/controlplane/controller.go | 1 + pkg/controlplane/instance.go | 8 +++--- pkg/controlplane/reconcilers/instancecount.go | 3 +++ pkg/controlplane/reconcilers/lease.go | 26 ++++++++++++++++--- pkg/controlplane/reconcilers/lease_test.go | 3 +++ pkg/controlplane/reconcilers/none.go | 3 +++ pkg/controlplane/reconcilers/reconcilers.go | 4 +++ 7 files changed, 40 insertions(+), 8 deletions(-) diff --git a/pkg/controlplane/controller.go b/pkg/controlplane/controller.go index 087f50a2875..8887ded5416 100644 --- a/pkg/controlplane/controller.go +++ b/pkg/controlplane/controller.go @@ -200,6 +200,7 @@ func (c *Controller) Stop() { if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil { klog.Errorf("Unable to remove endpoints from kubernetes service: %v", err) } + c.EndpointReconciler.Destroy() }() select { diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index d82dbd5a278..8a6c246fad6 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -65,7 +65,6 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/dynamiccertificates" serverstorage "k8s.io/apiserver/pkg/server/storage" - storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -259,13 +258,12 @@ func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler { ttl := c.ExtraConfig.MasterEndpointReconcileTTL config, err := c.ExtraConfig.StorageFactory.NewConfig(api.Resource("apiServerIPInfo")) if err != nil { - klog.Fatalf("Error determining service IP ranges: %v", err) + klog.Fatalf("Error creating storage factory config: %v", err) } - leaseStorage, _, err := storagefactory.Create(*config, nil) + masterLeases, err := reconcilers.NewLeases(config, "/masterleases/", ttl) if err != nil { - klog.Fatalf("Error creating storage factory: %v", err) + klog.Fatalf("Error creating leases: %v", err) } - masterLeases := reconcilers.NewLeases(leaseStorage, "/masterleases/", ttl) return reconcilers.NewLeaseEndpointReconciler(endpointsAdapter, masterLeases) } diff --git a/pkg/controlplane/reconcilers/instancecount.go b/pkg/controlplane/reconcilers/instancecount.go index f2fc6ad92f7..35178963a72 100644 --- a/pkg/controlplane/reconcilers/instancecount.go +++ b/pkg/controlplane/reconcilers/instancecount.go @@ -181,6 +181,9 @@ func (r *masterCountEndpointReconciler) StopReconciling() { r.stopReconcilingCalled = true } +func (r *masterCountEndpointReconciler) Destroy() { +} + // Determine if the endpoint is in the format ReconcileEndpoints expects. // // Return values: diff --git a/pkg/controlplane/reconcilers/lease.go b/pkg/controlplane/reconcilers/lease.go index 891ab31c4d7..9b07081677a 100644 --- a/pkg/controlplane/reconcilers/lease.go +++ b/pkg/controlplane/reconcilers/lease.go @@ -37,6 +37,8 @@ import ( apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/storagebackend" + storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory" endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints" ) @@ -50,10 +52,14 @@ type Leases interface { // RemoveLease removes a master's lease RemoveLease(ip string) error + + // Destroy cleans up everything on shutdown. + Destroy() } type storageLeases struct { storage storage.Interface + destroyFn func() baseKey string leaseTime time.Duration } @@ -117,13 +123,23 @@ func (s *storageLeases) RemoveLease(ip string) error { return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &corev1.Endpoints{}, nil, rest.ValidateAllObjectFunc, nil) } +func (s *storageLeases) Destroy() { + s.destroyFn() +} + // NewLeases creates a new etcd-based Leases implementation. -func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duration) Leases { +func NewLeases(config *storagebackend.ConfigForResource, baseKey string, leaseTime time.Duration) (Leases, error) { + leaseStorage, destroyFn, err := storagefactory.Create(*config, nil) + if err != nil { + return nil, fmt.Errorf("error creating storage factory: %v", err) + } + var once sync.Once return &storageLeases{ - storage: storage, + storage: leaseStorage, + destroyFn: func() { once.Do(destroyFn) }, baseKey: baseKey, leaseTime: leaseTime, - } + }, nil } type leaseEndpointReconciler struct { @@ -308,3 +324,7 @@ func (r *leaseEndpointReconciler) StopReconciling() { defer r.reconcilingLock.Unlock() r.stopReconcilingCalled = true } + +func (r *leaseEndpointReconciler) Destroy() { + r.masterLeases.Destroy() +} diff --git a/pkg/controlplane/reconcilers/lease_test.go b/pkg/controlplane/reconcilers/lease_test.go index 1d27f1c0361..3f99897f222 100644 --- a/pkg/controlplane/reconcilers/lease_test.go +++ b/pkg/controlplane/reconcilers/lease_test.go @@ -77,6 +77,9 @@ func (f *fakeLeases) GetUpdatedKeys() []string { return res } +func (f *fakeLeases) Destroy() { +} + func TestLeaseEndpointReconciler(t *testing.T) { ns := corev1.NamespaceDefault om := func(name string, skipMirrorLabel bool) metav1.ObjectMeta { diff --git a/pkg/controlplane/reconcilers/none.go b/pkg/controlplane/reconcilers/none.go index 2eb49741bbd..e7ff7ea7e36 100644 --- a/pkg/controlplane/reconcilers/none.go +++ b/pkg/controlplane/reconcilers/none.go @@ -44,3 +44,6 @@ func (r *noneEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, func (r *noneEndpointReconciler) StopReconciling() { } + +func (r *noneEndpointReconciler) Destroy() { +} diff --git a/pkg/controlplane/reconcilers/reconcilers.go b/pkg/controlplane/reconcilers/reconcilers.go index 8f280038765..0bc5856bebd 100644 --- a/pkg/controlplane/reconcilers/reconcilers.go +++ b/pkg/controlplane/reconcilers/reconcilers.go @@ -40,6 +40,10 @@ type EndpointReconciler interface { RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error // StopReconciling turns any later ReconcileEndpoints call into a noop. StopReconciling() + // Destroy shuts down all internal structures. + // Destroy needs to be implemented in thread-safe way and be prepared for being + // called more than once. + Destroy() } // Type the reconciler type