mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 10:20:51 +00:00
Destroy endpoints-reconciler lease storage on shutdown
This commit is contained in:
parent
b7c4d243de
commit
33dca56ba8
@ -200,6 +200,7 @@ func (c *Controller) Stop() {
|
||||
if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
|
||||
klog.Errorf("Unable to remove endpoints from kubernetes service: %v", err)
|
||||
}
|
||||
c.EndpointReconciler.Destroy()
|
||||
}()
|
||||
|
||||
select {
|
||||
|
@ -65,7 +65,6 @@ import (
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
@ -259,13 +258,12 @@ func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
|
||||
ttl := c.ExtraConfig.MasterEndpointReconcileTTL
|
||||
config, err := c.ExtraConfig.StorageFactory.NewConfig(api.Resource("apiServerIPInfo"))
|
||||
if err != nil {
|
||||
klog.Fatalf("Error determining service IP ranges: %v", err)
|
||||
klog.Fatalf("Error creating storage factory config: %v", err)
|
||||
}
|
||||
leaseStorage, _, err := storagefactory.Create(*config, nil)
|
||||
masterLeases, err := reconcilers.NewLeases(config, "/masterleases/", ttl)
|
||||
if err != nil {
|
||||
klog.Fatalf("Error creating storage factory: %v", err)
|
||||
klog.Fatalf("Error creating leases: %v", err)
|
||||
}
|
||||
masterLeases := reconcilers.NewLeases(leaseStorage, "/masterleases/", ttl)
|
||||
|
||||
return reconcilers.NewLeaseEndpointReconciler(endpointsAdapter, masterLeases)
|
||||
}
|
||||
|
@ -181,6 +181,9 @@ func (r *masterCountEndpointReconciler) StopReconciling() {
|
||||
r.stopReconcilingCalled = true
|
||||
}
|
||||
|
||||
func (r *masterCountEndpointReconciler) Destroy() {
|
||||
}
|
||||
|
||||
// Determine if the endpoint is in the format ReconcileEndpoints expects.
|
||||
//
|
||||
// Return values:
|
||||
|
@ -37,6 +37,8 @@ import (
|
||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||
endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
|
||||
)
|
||||
|
||||
@ -50,10 +52,14 @@ type Leases interface {
|
||||
|
||||
// RemoveLease removes a master's lease
|
||||
RemoveLease(ip string) error
|
||||
|
||||
// Destroy cleans up everything on shutdown.
|
||||
Destroy()
|
||||
}
|
||||
|
||||
type storageLeases struct {
|
||||
storage storage.Interface
|
||||
destroyFn func()
|
||||
baseKey string
|
||||
leaseTime time.Duration
|
||||
}
|
||||
@ -117,13 +123,23 @@ func (s *storageLeases) RemoveLease(ip string) error {
|
||||
return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &corev1.Endpoints{}, nil, rest.ValidateAllObjectFunc, nil)
|
||||
}
|
||||
|
||||
func (s *storageLeases) Destroy() {
|
||||
s.destroyFn()
|
||||
}
|
||||
|
||||
// NewLeases creates a new etcd-based Leases implementation.
|
||||
func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duration) Leases {
|
||||
func NewLeases(config *storagebackend.ConfigForResource, baseKey string, leaseTime time.Duration) (Leases, error) {
|
||||
leaseStorage, destroyFn, err := storagefactory.Create(*config, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating storage factory: %v", err)
|
||||
}
|
||||
var once sync.Once
|
||||
return &storageLeases{
|
||||
storage: storage,
|
||||
storage: leaseStorage,
|
||||
destroyFn: func() { once.Do(destroyFn) },
|
||||
baseKey: baseKey,
|
||||
leaseTime: leaseTime,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
type leaseEndpointReconciler struct {
|
||||
@ -308,3 +324,7 @@ func (r *leaseEndpointReconciler) StopReconciling() {
|
||||
defer r.reconcilingLock.Unlock()
|
||||
r.stopReconcilingCalled = true
|
||||
}
|
||||
|
||||
func (r *leaseEndpointReconciler) Destroy() {
|
||||
r.masterLeases.Destroy()
|
||||
}
|
||||
|
@ -77,6 +77,9 @@ func (f *fakeLeases) GetUpdatedKeys() []string {
|
||||
return res
|
||||
}
|
||||
|
||||
func (f *fakeLeases) Destroy() {
|
||||
}
|
||||
|
||||
func TestLeaseEndpointReconciler(t *testing.T) {
|
||||
ns := corev1.NamespaceDefault
|
||||
om := func(name string, skipMirrorLabel bool) metav1.ObjectMeta {
|
||||
|
@ -44,3 +44,6 @@ func (r *noneEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP,
|
||||
|
||||
func (r *noneEndpointReconciler) StopReconciling() {
|
||||
}
|
||||
|
||||
func (r *noneEndpointReconciler) Destroy() {
|
||||
}
|
||||
|
@ -40,6 +40,10 @@ type EndpointReconciler interface {
|
||||
RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error
|
||||
// StopReconciling turns any later ReconcileEndpoints call into a noop.
|
||||
StopReconciling()
|
||||
// Destroy shuts down all internal structures.
|
||||
// Destroy needs to be implemented in thread-safe way and be prepared for being
|
||||
// called more than once.
|
||||
Destroy()
|
||||
}
|
||||
|
||||
// Type the reconciler type
|
||||
|
Loading…
Reference in New Issue
Block a user